You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:46 UTC
[10/25] gora git commit: Code is organized at LogAnalyticsSpark.java.
Code is organized at LogAnalyticsSpark.java.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/21955700
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/21955700
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/21955700
Branch: refs/heads/master
Commit: 219557002f0e33ef6d0e2bb49471d24fb867b0ac
Parents: cf6e765
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 21:21:43 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 21:21:43 2015 +0300
----------------------------------------------------------------------
.../gora/tutorial/log/LogAnalyticsSpark.java | 51 +++++++++++++-------
1 file changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/21955700/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 372c124..327bf7f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -35,6 +35,8 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -54,6 +56,8 @@ import scala.Tuple2;
*/
public class LogAnalyticsSpark {
+ private static final Logger log = LoggerFactory.getLogger(LogAnalyticsSpark.class);
+
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
/** The number of milliseconds in a day */
@@ -112,13 +116,15 @@ public class LogAnalyticsSpark {
System.exit(1);
}
- String inStoreClass = args[0];
- String outStoreClass = args[1];
-
LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
- int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
- System.exit(ret);
+ try {
+ int ret = logAnalyticsSpark.run(args);
+ System.exit(ret);
+ } catch (Exception ex){
+ log.error("Error occurred!");
+ }
+
}
/**
@@ -129,27 +135,39 @@ public class LogAnalyticsSpark {
return (timeStamp / DAY_MILIS) * DAY_MILIS;
}
- public int run(String inStoreClass, String outStoreClass) throws Exception {
+ public int run(String[] args) throws Exception {
+
+ DataStore<Long, Pageview> inStore;
+ DataStore<String, MetricDatum> outStore;
+ Configuration hadoopConf = new Configuration();
+
+ if (args.length > 0) {
+ String dataStoreClass = args[0];
+ inStore = DataStoreFactory.getDataStore(
+ dataStoreClass, Long.class, Pageview.class, hadoopConf);
+ if (args.length > 1) {
+ dataStoreClass = args[1];
+ }
+ outStore = DataStoreFactory.getDataStore(
+ dataStoreClass, String.class, MetricDatum.class, hadoopConf);
+ } else {
+ inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
+ outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);
+ }
+
+ //Spark engine initialization
GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
- // todo _fk consider alternative architectural design
- // todo design inStore and outStore initialization parts as like LogAnalytics.java
- // todo consider creating job and manipulating it at input part as like LogAnalytics.java
Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);
//
JavaSparkContext sc = new JavaSparkContext(sparkConf);
- Configuration hadoopConf = new Configuration();
-
- DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore(
- inStoreClass, Long.class, Pageview.class, hadoopConf);
-
JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
@@ -176,9 +194,6 @@ public class LogAnalyticsSpark {
//
//write output to datastore
- DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore(
- outStoreClass, String.class, MetricDatum.class, hadoopConf);
-
GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
Job job = Job.getInstance(hadoopConf);
@@ -198,6 +213,8 @@ public class LogAnalyticsSpark {
inStore.close();
outStore.close();
+ log.info("Log completed with success");
+
return 1;
}
}