You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:38 UTC

[02/25] gora git commit: * GoraSpark.java initialize method renamed to initializeInput. * Architectural change is made.

* GoraSpark.java initialize method renamed to initializeInput.
* Architectural change is made.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ef68cead
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ef68cead
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ef68cead

Branch: refs/heads/master
Commit: ef68cead273324797cf292dbe6da18ee3fd819cb
Parents: 9c2d225
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 17:17:52 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 17:17:52 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSpark.java   | 50 +++++++++++++++-----
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 44 ++++++++---------
 2 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 690e32c..4279cfb 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -24,6 +24,7 @@ import org.apache.gora.mapreduce.GoraMapReduceUtils;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -37,19 +38,44 @@ public class GoraSpark<K, V extends Persistent> {
 
   public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
     this.clazzK = clazzK;
-	this.clazzV = clazzV;
+    this.clazzV = clazzV;
   }
 
-  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
       Configuration conf, DataStore<K, V> dataStore) {
-	  GoraMapReduceUtils.setIOSerializations(conf, true);
-      try {
-	    IOUtils.storeToConf(dataStore.newQuery(), conf,
-		    GoraInputFormat.QUERY_KEY);
-      } catch (IOException ioex) {
-        throw new RuntimeException(ioex.getMessage());
-	  }
-       return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
-	     clazzK, clazzV);
-	}
+    GoraMapReduceUtils.setIOSerializations(conf, true);
+
+    try {
+      IOUtils
+          .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex.getMessage());
+    }
+
+    return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+        clazzV);
+  }
+
+  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+      DataStore<K, V> dataStore) {
+    Configuration hadoopConf;
+
+      if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
+          hadoopConf = ((Configurable) dataStore).getConf();
+      } else {
+          hadoopConf = new Configuration();
+      }
+
+      GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+
+    try {
+      IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
+              GoraInputFormat.QUERY_KEY);
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex.getMessage());
+    }
+
+    return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
+        clazzK, clazzV);
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 214b130..9b28fd7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -21,44 +21,44 @@ import org.apache.gora.spark.GoraSpark;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.tutorial.log.generated.Pageview;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-public class LogAnalyticsSpark extends Configured implements Tool {
+public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
-  private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
 
   public static void main(String[] args) throws Exception {
-      if (args.length < 2) {
-        System.err.println(USAGE);
-        System.exit(1);
+    if (args.length < 2) {
+      System.err.println(USAGE);
+      System.exit(1);
     }
-      // run as any other MR job
-      int ret = ToolRunner.run(logAnalyticsSpark, args);
-      System.exit(ret);
+
+    String inStoreClass = args[0];
+    String outStoreClass = args[1];
+
+    LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+    int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
+
+    System.exit(ret);
   }
 
-  @Override
-  public int run(String[] args) throws Exception {
-    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
-      Long.class, Pageview.class);
+  public int run(String inStoreClass, String outStoreClass) throws Exception {
+    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+        Pageview.class);
 
-    SparkConf conf = new SparkConf().setAppName(
+    SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
-    JavaSparkContext sc = new JavaSparkContext(conf);
+    JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+    Configuration hadoopConf = new Configuration();
 
-    String dataStoreClass = args[0];
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
-        dataStoreClass, Long.class, Pageview.class,
-    logAnalyticsSpark.getConf());
+        inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
-        .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
     // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
     // cachedGoraRdd = goraRDD.cache();