You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:42 UTC

[06/25] gora git commit: * GoraSpark.java initializeInput method renamed to initialize * reduce part is added to example.

* GoraSpark.java initializeInput method renamed to initialize
* reduce part is added to example.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/81af4d3a
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/81af4d3a
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/81af4d3a

Branch: refs/heads/master
Commit: 81af4d3afcba4633d0c5d06ead9b4256ea60862f
Parents: 445edb1
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 18:33:38 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 18:33:38 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSpark.java   |  4 +-
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 49 ++++++++++++++++----
 2 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 4279cfb..02b5b39 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -41,7 +41,7 @@ public class GoraSpark<K, V extends Persistent> {
     this.clazzV = clazzV;
   }
 
-  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
       Configuration conf, DataStore<K, V> dataStore) {
     GoraMapReduceUtils.setIOSerializations(conf, true);
 
@@ -56,7 +56,7 @@ public class GoraSpark<K, V extends Persistent> {
         clazzV);
   }
 
-  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
       DataStore<K, V> dataStore) {
     Configuration hadoopConf;
 

http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0ad3e57..d69649e 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -20,6 +20,7 @@ package org.apache.gora.tutorial.log;
 import org.apache.gora.spark.GoraSpark;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.MetricDatum;
 import org.apache.gora.tutorial.log.generated.Pageview;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
@@ -27,11 +28,11 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
-import java.util.concurrent.TimeUnit;
-
 public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -39,18 +40,44 @@ public class LogAnalyticsSpark {
   /** The number of milliseconds in a day */
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
-  //todo _fk consider using Kyro serialization
-  private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () {
+  // todo _fk consider using Kyro serialization
+  private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
     @Override
-    public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception {
+    public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
+        throws Exception {
       String url = pageview.getUrl().toString();
       Long day = getDay(pageview.getTimestamp());
-      Tuple2<String, Long> keyTuple =new Tuple2<>(url, day);
+      Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
 
       return new Tuple2<>(keyTuple, 1L);
     }
   };
 
+  private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
+    @Override
+    public Long call(Long aLong, Long aLong2) throws Exception {
+      return aLong + aLong2;
+    }
+  };
+
+  private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
+    @Override
+    public Tuple2<String, MetricDatum> call(
+        Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
+      String dimension = tuple2LongTuple2._1()._1();
+      long timestamp = tuple2LongTuple2._1()._2();
+
+      MetricDatum metricDatum = new MetricDatum();
+      metricDatum.setMetricDimension(dimension);
+      metricDatum.setTimestamp(timestamp);
+
+      String key = metricDatum.getMetricDimension().toString();
+      key += "_" + Long.toString(timestamp);
+      metricDatum.setMetric(tuple2LongTuple2._2());
+      return new Tuple2<>(key, metricDatum);
+    }
+  };
+
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println(USAGE);
@@ -93,7 +120,7 @@ public class LogAnalyticsSpark {
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
         dataStore);
 
     long count = goraRDD.count();
@@ -102,7 +129,13 @@ public class LogAnalyticsSpark {
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
 
-    JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s);
+    JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
+        .values().map(mapFunc);
+
+    JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD
+        .fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
+
+    System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
     return 1;
   }