You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:40 UTC

[04/25] gora git commit: * map function is implemented as like in LogAnalytics.java

* map function is implemented as like in LogAnalytics.java


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5644a21c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5644a21c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5644a21c

Branch: refs/heads/master
Commit: 5644a21c5c5678611c7cef4e2c922643088951b8
Parents: 8fbdef7
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 02:10:26 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 02:10:26 2015 +0300

----------------------------------------------------------------------
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 38 +++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/5644a21c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0194c3b..43acba0 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -24,12 +24,31 @@ import org.apache.gora.tutorial.log.generated.Pageview;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import java.util.concurrent.TimeUnit;
 
 public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
 
+  /** The number of milliseconds in a day */
+  private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+
+  //todo _fk consider using Kyro serialization
+  private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() {
+    @Override
+    public Tuple2<String, Long> call(Pageview pageview) throws Exception {
+      String key = pageview.getUrl().toString();
+      Long value = getDay(pageview.getTimestamp());
+      return new Tuple2<>(key, value);
+    }
+  };
+
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println(USAGE);
@@ -45,14 +64,22 @@ public class LogAnalyticsSpark {
     System.exit(ret);
   }
 
+  /**
+   * Rolls up the given timestamp to the day cardinality, so that data can be
+   * aggregated daily
+   */
+  private static long getDay(long timeStamp) {
+    return (timeStamp / DAY_MILIS) * DAY_MILIS;
+  }
+
   public int run(String inStoreClass, String outStoreClass) throws Exception {
     GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
-      
-    //todo _fk change architectural desigm
+
+    // todo _fk consider alternative architectural design
     Class[] c = new Class[1];
     c[0] = Pageview.class;
     sparkConf.registerKryoClasses(c);
@@ -64,9 +91,8 @@ public class LogAnalyticsSpark {
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
-    // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
-    // cachedGoraRdd = goraRDD.cache();
+    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+        dataStore);
 
     long count = goraRDD.count();
     System.out.println("Total Count: " + count);
@@ -74,6 +100,8 @@ public class LogAnalyticsSpark {
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
 
+    JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s);
+
     return 1;
   }
 }