You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:38 UTC
[02/25] gora git commit: * GoraSpark.java initialize method renamed
to initializeInput. * Architectural change is made.
* GoraSpark.java initialize method renamed to initializeInput.
* Architectural change is made.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ef68cead
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ef68cead
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ef68cead
Branch: refs/heads/master
Commit: ef68cead273324797cf292dbe6da18ee3fd819cb
Parents: 9c2d225
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 17:17:52 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 17:17:52 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/spark/GoraSpark.java | 50 +++++++++++++++-----
.../gora/tutorial/log/LogAnalyticsSpark.java | 44 ++++++++---------
2 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 690e32c..4279cfb 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -24,6 +24,7 @@ import org.apache.gora.mapreduce.GoraMapReduceUtils;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -37,19 +38,44 @@ public class GoraSpark<K, V extends Persistent> {
public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
this.clazzK = clazzK;
- this.clazzV = clazzV;
+ this.clazzV = clazzV;
}
- public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+ public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
Configuration conf, DataStore<K, V> dataStore) {
- GoraMapReduceUtils.setIOSerializations(conf, true);
- try {
- IOUtils.storeToConf(dataStore.newQuery(), conf,
- GoraInputFormat.QUERY_KEY);
- } catch (IOException ioex) {
- throw new RuntimeException(ioex.getMessage());
- }
- return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
- clazzK, clazzV);
- }
+ GoraMapReduceUtils.setIOSerializations(conf, true);
+
+ try {
+ IOUtils
+ .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex.getMessage());
+ }
+
+ return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+ clazzV);
+ }
+
+ public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+ DataStore<K, V> dataStore) {
+ Configuration hadoopConf;
+
+ if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
+ hadoopConf = ((Configurable) dataStore).getConf();
+ } else {
+ hadoopConf = new Configuration();
+ }
+
+ GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+
+ try {
+ IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
+ GoraInputFormat.QUERY_KEY);
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex.getMessage());
+ }
+
+ return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
+ clazzK, clazzV);
+ }
}
http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 214b130..9b28fd7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -21,44 +21,44 @@ import org.apache.gora.spark.GoraSpark;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.Pageview;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-public class LogAnalyticsSpark extends Configured implements Tool {
+public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
- private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println(USAGE);
- System.exit(1);
+ if (args.length < 2) {
+ System.err.println(USAGE);
+ System.exit(1);
}
- // run as any other MR job
- int ret = ToolRunner.run(logAnalyticsSpark, args);
- System.exit(ret);
+
+ String inStoreClass = args[0];
+ String outStoreClass = args[1];
+
+ LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+ int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
+
+ System.exit(ret);
}
- @Override
- public int run(String[] args) throws Exception {
- GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
- Long.class, Pageview.class);
+ public int run(String inStoreClass, String outStoreClass) throws Exception {
+ GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+ Pageview.class);
- SparkConf conf = new SparkConf().setAppName(
+ SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
- JavaSparkContext sc = new JavaSparkContext(conf);
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+ Configuration hadoopConf = new Configuration();
- String dataStoreClass = args[0];
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
- dataStoreClass, Long.class, Pageview.class,
- logAnalyticsSpark.getConf());
+ inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
- .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+ JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
// JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
// cachedGoraRdd = goraRDD.cache();