You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:46 UTC

[10/25] gora git commit: Code is organized at LogAnalyticsSpark.java.

Code is organized at LogAnalyticsSpark.java.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/21955700
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/21955700
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/21955700

Branch: refs/heads/master
Commit: 219557002f0e33ef6d0e2bb49471d24fb867b0ac
Parents: cf6e765
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 21:21:43 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 21:21:43 2015 +0300

----------------------------------------------------------------------
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 51 +++++++++++++-------
 1 file changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/21955700/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 372c124..327bf7f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -35,6 +35,8 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -54,6 +56,8 @@ import scala.Tuple2;
  */
 public class LogAnalyticsSpark {
 
+  private static final Logger log = LoggerFactory.getLogger(LogAnalyticsSpark.class);
+
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
 
   /** The number of milliseconds in a day */
@@ -112,13 +116,15 @@ public class LogAnalyticsSpark {
       System.exit(1);
     }
 
-    String inStoreClass = args[0];
-    String outStoreClass = args[1];
-
     LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
-    int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
 
-    System.exit(ret);
+    try {
+      int ret = logAnalyticsSpark.run(args);
+      System.exit(ret);
+    } catch (Exception ex){
+      log.error("Error occurred!");
+    }
+
   }
 
   /**
@@ -129,27 +135,39 @@ public class LogAnalyticsSpark {
     return (timeStamp / DAY_MILIS) * DAY_MILIS;
   }
 
-  public int run(String inStoreClass, String outStoreClass) throws Exception {
+  public int run(String[] args) throws Exception {
+
+    DataStore<Long, Pageview> inStore;
+    DataStore<String, MetricDatum> outStore;
+    Configuration hadoopConf = new Configuration();
+
+    if (args.length > 0) {
+      String dataStoreClass = args[0];
+      inStore = DataStoreFactory.getDataStore(
+      dataStoreClass, Long.class, Pageview.class, hadoopConf);
+      if (args.length > 1) {
+        dataStoreClass = args[1];
+      }
+        outStore = DataStoreFactory.getDataStore(
+        dataStoreClass, String.class, MetricDatum.class, hadoopConf);
+      } else {
+        inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
+        outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);
+    }
+
+    //Spark engine initialization
     GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
 
-    // todo _fk consider alternative architectural design
-    // todo design inStore and outStore initialization parts as like LogAnalytics.java
-    // todo consider creating job and manipulating it at input part as like LogAnalytics.java
     Class[] c = new Class[1];
     c[0] = Pageview.class;
     sparkConf.registerKryoClasses(c);
     //
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
-    Configuration hadoopConf = new Configuration();
-
-    DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore(
-        inStoreClass, Long.class, Pageview.class, hadoopConf);
-
     JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
 
     long count = goraRDD.count();
@@ -176,9 +194,6 @@ public class LogAnalyticsSpark {
     //
 
     //write output to datastore
-    DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore(
-            outStoreClass, String.class, MetricDatum.class, hadoopConf);
-
     GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
 
     Job job = Job.getInstance(hadoopConf);
@@ -198,6 +213,8 @@ public class LogAnalyticsSpark {
     inStore.close();
     outStore.close();
 
+    log.info("Log completed with success");
+
     return 1;
   }
 }