You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:42 UTC
[06/25] gora git commit: * GoraSpark.java initializeInput method
renamed to initialize * reduce part is added to example.
* GoraSpark.java initializeInput method renamed to initialize
* reduce part is added to example.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/81af4d3a
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/81af4d3a
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/81af4d3a
Branch: refs/heads/master
Commit: 81af4d3afcba4633d0c5d06ead9b4256ea60862f
Parents: 445edb1
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 18:33:38 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 18:33:38 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/spark/GoraSpark.java | 4 +-
.../gora/tutorial/log/LogAnalyticsSpark.java | 49 ++++++++++++++++----
2 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 4279cfb..02b5b39 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -41,7 +41,7 @@ public class GoraSpark<K, V extends Persistent> {
this.clazzV = clazzV;
}
- public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
Configuration conf, DataStore<K, V> dataStore) {
GoraMapReduceUtils.setIOSerializations(conf, true);
@@ -56,7 +56,7 @@ public class GoraSpark<K, V extends Persistent> {
clazzV);
}
- public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
DataStore<K, V> dataStore) {
Configuration hadoopConf;
http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0ad3e57..d69649e 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -20,6 +20,7 @@ package org.apache.gora.tutorial.log;
import org.apache.gora.spark.GoraSpark;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
@@ -27,11 +28,11 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
-import java.util.concurrent.TimeUnit;
-
public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -39,18 +40,44 @@ public class LogAnalyticsSpark {
/** The number of milliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
- //todo _fk consider using Kyro serialization
- private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () {
+ // todo _fk consider using Kyro serialization
+ private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
@Override
- public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception {
+ public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
+ throws Exception {
String url = pageview.getUrl().toString();
Long day = getDay(pageview.getTimestamp());
- Tuple2<String, Long> keyTuple =new Tuple2<>(url, day);
+ Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
return new Tuple2<>(keyTuple, 1L);
}
};
+ private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
+ @Override
+ public Long call(Long aLong, Long aLong2) throws Exception {
+ return aLong + aLong2;
+ }
+ };
+
+ private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
+ @Override
+ public Tuple2<String, MetricDatum> call(
+ Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
+ String dimension = tuple2LongTuple2._1()._1();
+ long timestamp = tuple2LongTuple2._1()._2();
+
+ MetricDatum metricDatum = new MetricDatum();
+ metricDatum.setMetricDimension(dimension);
+ metricDatum.setTimestamp(timestamp);
+
+ String key = metricDatum.getMetricDimension().toString();
+ key += "_" + Long.toString(timestamp);
+ metricDatum.setMetric(tuple2LongTuple2._2());
+ return new Tuple2<>(key, metricDatum);
+ }
+ };
+
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println(USAGE);
@@ -93,7 +120,7 @@ public class LogAnalyticsSpark {
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+ JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
dataStore);
long count = goraRDD.count();
@@ -102,7 +129,13 @@ public class LogAnalyticsSpark {
String firstOneURL = goraRDD.first()._2().getUrl().toString();
System.out.println(firstOneURL);
- JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s);
+ JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
+ .values().map(mapFunc);
+
+ JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD
+ .fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
+
+ System.out.println("MetricDatum count:" + reducedGoraRdd.count());
return 1;
}