You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:40 UTC
[04/25] gora git commit: * map function is implemented as like in
LogAnalytics.java
* map function is implemented as like in LogAnalytics.java
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5644a21c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5644a21c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5644a21c
Branch: refs/heads/master
Commit: 5644a21c5c5678611c7cef4e2c922643088951b8
Parents: 8fbdef7
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 02:10:26 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 02:10:26 2015 +0300
----------------------------------------------------------------------
.../gora/tutorial/log/LogAnalyticsSpark.java | 38 +++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/5644a21c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0194c3b..43acba0 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -24,12 +24,31 @@ import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import java.util.concurrent.TimeUnit;
public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
+ /** The number of milliseconds in a day */
+ private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+
+ //todo _fk consider using Kyro serialization
+ private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() {
+ @Override
+ public Tuple2<String, Long> call(Pageview pageview) throws Exception {
+ String key = pageview.getUrl().toString();
+ Long value = getDay(pageview.getTimestamp());
+ return new Tuple2<>(key, value);
+ }
+ };
+
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println(USAGE);
@@ -45,14 +64,22 @@ public class LogAnalyticsSpark {
System.exit(ret);
}
+ /**
+ * Rolls up the given timestamp to the day cardinality, so that data can be
+ * aggregated daily
+ */
+ private static long getDay(long timeStamp) {
+ return (timeStamp / DAY_MILIS) * DAY_MILIS;
+ }
+
public int run(String inStoreClass, String outStoreClass) throws Exception {
GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
-
- //todo _fk change architectural desigm
+
+ // todo _fk consider alternative architectural design
Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);
@@ -64,9 +91,8 @@ public class LogAnalyticsSpark {
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
- // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
- // cachedGoraRdd = goraRDD.cache();
+ JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+ dataStore);
long count = goraRDD.count();
System.out.println("Total Count: " + count);
@@ -74,6 +100,8 @@ public class LogAnalyticsSpark {
String firstOneURL = goraRDD.first()._2().getUrl().toString();
System.out.println(firstOneURL);
+ JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s);
+
return 1;
}
}