You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:37 UTC
[01/25] gora git commit: * JavaPairRDD support for GoraInputFormat.
Repository: gora
Updated Branches:
refs/heads/master 130257370 -> ea44388f9
* JavaPairRDD support for GoraInputFormat.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/9c2d225d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/9c2d225d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/9c2d225d
Branch: refs/heads/master
Commit: 9c2d225d04cfa746244373fa661a1aa6f03250bb
Parents: bb09d89
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 01:57:51 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 01:57:51 2015 +0300
----------------------------------------------------------------------
gora-core/pom.xml | 7 ++
.../java/org/apache/gora/spark/GoraSpark.java | 55 ++++++++++++++++
.../gora/tutorial/log/LogAnalyticsSpark.java | 69 ++++++++++++++++++++
pom.xml | 4 ++
4 files changed, 135 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index eab5330..5f147fb 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -141,6 +141,13 @@
<artifactId>guava</artifactId>
</dependency>
+ <!-- Spark dependency -->
+ <dependency> <!-- Spark dependency -->
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>1.3.1</version>
+ </dependency>
+
<!-- Logging Dependencies -->
<dependency>
<groupId>log4j</groupId>
http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
new file mode 100644
index 0000000..690e32c
--- /dev/null
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.spark;
+
+import java.io.IOException;
+
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Base class for Spark integration
+ */
+public class GoraSpark<K, V extends Persistent> {
+ Class<K> clazzK;
+ Class<V> clazzV;
+
+ public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
+ this.clazzK = clazzK;
+ this.clazzV = clazzV;
+ }
+
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+ Configuration conf, DataStore<K, V> dataStore) {
+ GoraMapReduceUtils.setIOSerializations(conf, true);
+ try {
+ IOUtils.storeToConf(dataStore.newQuery(), conf,
+ GoraInputFormat.QUERY_KEY);
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex.getMessage());
+ }
+ return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
+ clazzK, clazzV);
+ }
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
new file mode 100644
index 0000000..214b130
--- /dev/null
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.tutorial.log;
+
+import org.apache.gora.spark.GoraSpark;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.Pageview;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class LogAnalyticsSpark extends Configured implements Tool {
+
+ private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
+ private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println(USAGE);
+ System.exit(1);
+ }
+ // run as any other MR job
+ int ret = ToolRunner.run(logAnalyticsSpark, args);
+ System.exit(ret);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
+ Long.class, Pageview.class);
+
+ SparkConf conf = new SparkConf().setAppName(
+ "Gora Integration Application").setMaster("local");
+ JavaSparkContext sc = new JavaSparkContext(conf);
+
+ String dataStoreClass = args[0];
+ DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
+ dataStoreClass, Long.class, Pageview.class,
+ logAnalyticsSpark.getConf());
+
+ JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
+ .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+ // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
+ // cachedGoraRdd = goraRDD.cache();
+
+ long count = goraRDD.count();
+ System.out.println("Total Count: " + count);
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01a7fee..a6261ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -893,6 +893,10 @@
<groupId>ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
</exclusions>
</dependency>
[03/25] gora git commit: * Serialization support via Kyro.
Posted by le...@apache.org.
* Serialization support via Kyro.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8fbdef7d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8fbdef7d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8fbdef7d
Branch: refs/heads/master
Commit: 8fbdef7de963defb1387f0fbbe9e56506b627ba3
Parents: ef68cea
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 18:49:13 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 18:49:13 2015 +0300
----------------------------------------------------------------------
.../org/apache/gora/tutorial/log/LogAnalyticsSpark.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/8fbdef7d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 9b28fd7..0194c3b 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -51,6 +51,12 @@ public class LogAnalyticsSpark {
SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
+
+ //todo _fk change architectural desigm
+ Class[] c = new Class[1];
+ c[0] = Pageview.class;
+ sparkConf.registerKryoClasses(c);
+ //
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
@@ -64,6 +70,10 @@ public class LogAnalyticsSpark {
long count = goraRDD.count();
System.out.println("Total Count: " + count);
+
+ String firstOneURL = goraRDD.first()._2().getUrl().toString();
+ System.out.println(firstOneURL);
+
return 1;
}
}
[13/25] gora git commit: Unnecessary reuseObjects is removed.
Posted by le...@apache.org.
Unnecessary reuseObjects is removed.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/92b71a6d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/92b71a6d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/92b71a6d
Branch: refs/heads/master
Commit: 92b71a6d2c95492621a56e48470e951e982cc34f
Parents: 8584911
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 23:34:40 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 23:34:40 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/spark/GoraSparkEngine.java | 13 +++++--------
.../apache/gora/tutorial/log/LogAnalyticsSpark.java | 2 +-
2 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/92b71a6d/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index 7a819fd..98dafea 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -99,17 +99,16 @@ public class GoraSparkEngine<K, V extends Persistent> {
/**
* Creates a job and sets the output parameters for the conf that Spark will use
* @param dataStore the datastore as the output
- * @param reuseObjects whether to reuse objects in serialization
*/
- public <K, V extends Persistent> Configuration generateOutputConf(DataStore<K, V> dataStore,
- boolean reuseObjects) throws IOException {
+ public <K, V extends Persistent> Configuration generateOutputConf(DataStore<K, V> dataStore)
+ throws IOException {
Configuration hadoopConf = new Configuration();
GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
Job job = Job.getInstance(hadoopConf);
return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
- dataStore.getPersistentClass(), reuseObjects);
+ dataStore.getPersistentClass());
}
/**
@@ -121,7 +120,7 @@ public class GoraSparkEngine<K, V extends Persistent> {
public <K, V extends Persistent> Configuration generateOutputConf(Job job,
DataStore<K, V> dataStore, boolean reuseObjects) {
return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
- dataStore.getPersistentClass(), reuseObjects);
+ dataStore.getPersistentClass());
}
/**
@@ -131,13 +130,11 @@ public class GoraSparkEngine<K, V extends Persistent> {
* @param dataStoreClass the datastore class
* @param keyClass output key class
* @param persistentClass output value class
- * @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings("rawtypes")
public <K, V extends Persistent> Configuration generateOutputConf(Job job,
Class<? extends DataStore> dataStoreClass,
- Class<K> keyClass, Class<V> persistentClass,
- boolean reuseObjects) {
+ Class<K> keyClass, Class<V> persistentClass) {
job.setOutputFormatClass(GoraOutputFormat.class);
job.setOutputKeyClass(keyClass);
http://git-wip-us.apache.org/repos/asf/gora/blob/92b71a6d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index c66f7e4..8c37f8c 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -190,7 +190,7 @@ public class LogAnalyticsSpark {
//
//write output to datastore
- Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore, true);
+ Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
//
[16/25] gora git commit: Spark engine word count tests are
implemented. Due to a connection problem,
spark engine word count tests are ignored.
Posted by le...@apache.org.
Spark engine word count tests are implemented.
Due to a connection problem, spark engine word count tests are ignored.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/e817a7f0
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/e817a7f0
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/e817a7f0
Branch: refs/heads/master
Commit: e817a7f0eeb2efb98937efb92701705ec80891b9
Parents: 9cff335
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sat Aug 22 17:31:57 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sat Aug 22 17:31:57 2015 +0300
----------------------------------------------------------------------
.../gora/examples/spark/SparkWordCount.java | 150 +++++++++++++++++++
.../gora/mapreduce/MapReduceTestUtils.java | 31 ++++
.../mapreduce/TestHBaseStoreWordCount.java | 8 +
.../mapreduce/TestMongoStoreWordCount.java | 9 ++
4 files changed, 198 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
new file mode 100644
index 0000000..837da13
--- /dev/null
+++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.examples.spark;
+
+import org.apache.gora.examples.generated.TokenDatum;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.spark.GoraSparkEngine;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Classic word count example in Gora with Spark.
+ */
+public class SparkWordCount {
+ private static final Logger log = LoggerFactory.getLogger(SparkWordCount.class);
+
+ private static final String USAGE = "SparkWordCount <input_data_store> <output_data_store>";
+
+ /**
+ * map function used in calculation
+ */
+ private static Function<WebPage, Tuple2<String, Long>> mapFunc =
+ new Function<WebPage, Tuple2<String, Long>>() {
+ @Override
+ public Tuple2<String, Long> call(WebPage webPage)
+ throws Exception {
+ String content = new String(webPage.getContent().array());
+ return new Tuple2<>(content, 1L);
+ }
+ };
+
+ /**
+ * reduce function used in calculation
+ */
+ private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
+ @Override
+ public Long call(Long aLong, Long aLong2) throws Exception {
+ return aLong + aLong2;
+ }
+ };
+
+ public int wordCount(DataStore<String,WebPage> inStore,
+ DataStore<String, TokenDatum> outStore) throws IOException {
+
+ //Spark engine initialization
+ GoraSparkEngine<String, WebPage> goraSparkEngine = new GoraSparkEngine<>(String.class,
+ WebPage.class);
+
+ SparkConf sparkConf = new SparkConf().setAppName(
+ "Gora Spark Word Count Application").setMaster("local");
+
+ Class[] c = new Class[1];
+ c[0] = inStore.getPersistentClass();
+ sparkConf.registerKryoClasses(c);
+ //
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+ JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
+
+ long count = goraRDD.count();
+ System.out.println("Total Log Count: " + count);
+
+ JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
+
+ JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
+
+ //Print output for debug purpose
+ System.out.println("SparkWordCount debug purpose TokenDatum print starts:");
+ Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
+ for (String key : tokenDatumMap.keySet()) {
+ System.out.println(key);
+ System.out.println(tokenDatumMap.get(key));
+ }
+ System.out.println("SparkWordCount debug purpose TokenDatum print ends:");
+ //
+
+ //write output to datastore
+ Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
+ reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
+ //
+
+ return 1;
+ }
+
+ public int run(String[] args) throws Exception {
+
+ DataStore<String,WebPage> inStore;
+ DataStore<String, TokenDatum> outStore;
+ Configuration hadoopConf = new Configuration();
+ if(args.length > 0) {
+ String dataStoreClass = args[0];
+ inStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class, hadoopConf);
+ if(args.length > 1) {
+ dataStoreClass = args[1];
+ }
+ outStore = DataStoreFactory.getDataStore(dataStoreClass,
+ String.class, TokenDatum.class, hadoopConf);
+ } else {
+ inStore = DataStoreFactory.getDataStore(String.class, WebPage.class, hadoopConf);
+ outStore = DataStoreFactory.getDataStore(String.class, TokenDatum.class, hadoopConf);
+ }
+
+ return wordCount(inStore, outStore);
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ if (args.length < 2) {
+ System.err.println(USAGE);
+ System.exit(1);
+ }
+
+ SparkWordCount sparkWordCount = new SparkWordCount();
+
+ try {
+ int ret = sparkWordCount.run(args);
+ System.exit(ret);
+ } catch (Exception ex){
+ log.error("Error occurred!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
----------------------------------------------------------------------
diff --git a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
index 12ccea4..c04f615 100644
--- a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
+++ b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.gora.examples.generated.TokenDatum;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.examples.mapreduce.QueryCounter;
import org.apache.gora.examples.mapreduce.WordCount;
+import org.apache.gora.examples.spark.SparkWordCount;
import org.apache.gora.query.Query;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.impl.DataStoreBase;
@@ -103,6 +104,36 @@ public class MapReduceTestUtils {
assertTokenCount(outStore, entry.getKey(), entry.getValue());
}
}
+
+ public static void testSparkWordCount(Configuration conf, DataStore<String,WebPage> inStore, DataStore<String,
+ TokenDatum> outStore) throws Exception {
+ //Datastore now has to be a Hadoop based datastore
+ ((DataStoreBase<String,WebPage>)inStore).setConf(conf);
+ ((DataStoreBase<String,TokenDatum>)outStore).setConf(conf);
+
+ //create input
+ WebPageDataCreator.createWebPageData(inStore);
+
+ //run Spark
+ SparkWordCount wordCount = new SparkWordCount();
+ wordCount.wordCount(inStore, outStore);
+
+ //assert results
+ HashMap<String, Integer> actualCounts = new HashMap<String, Integer>();
+ for(String content : WebPageDataCreator.CONTENTS) {
+ if (content != null) {
+ for(String token:content.split(" ")) {
+ Integer count = actualCounts.get(token);
+ if(count == null)
+ count = 0;
+ actualCounts.put(token, ++count);
+ }
+ }
+ }
+ for(Map.Entry<String, Integer> entry:actualCounts.entrySet()) {
+ assertTokenCount(outStore, entry.getKey(), entry.getValue());
+ }
+ }
private static void assertTokenCount(DataStore<String, TokenDatum> outStore,
String token, int count) throws Exception {
http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
index b42b0c0..cb4ef03 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
@@ -27,6 +27,7 @@ import org.apache.gora.store.DataStoreFactory;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
/**
@@ -58,4 +59,11 @@ public class TestHBaseStoreWordCount {
MapReduceTestUtils.testWordCount(cluster.getConf(), webPageStore, tokenStore);
}
+ //todo fix config
+ @Ignore
+ @Test
+ public void testSparkWordCount() throws Exception {
+ MapReduceTestUtils.testSparkWordCount(cluster.getConf(), webPageStore, tokenStore);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
index d6a51a4..dd76263 100644
--- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
+++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
@@ -24,6 +24,7 @@ import org.apache.gora.mongodb.store.MongoStore;
import org.apache.gora.store.DataStoreFactory;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
/**
@@ -55,4 +56,12 @@ public class TestMongoStoreWordCount extends GoraMongoMapredTest {
webPageStore, tokenStore);
}
+ //todo fix config
+ @Ignore
+ @Test
+ public void testSparkWordCount() throws Exception {
+ MapReduceTestUtils.testSparkWordCount(testDriver.getConfiguration(),
+ webPageStore, tokenStore);
+ }
+
}
[08/25] gora git commit: * Writing to Hbase via Spark is implemented.
Posted by le...@apache.org.
* Writing to Hbase via Spark is implemented.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/c111e629
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/c111e629
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/c111e629
Branch: refs/heads/master
Commit: c111e6290fa16d2cc560eb29dbc07eb2f8b7734b
Parents: 80c0c26
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Wed Jul 15 20:23:51 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Wed Jul 15 20:23:51 2015 +0300
----------------------------------------------------------------------
.../gora/tutorial/log/LogAnalyticsSpark.java | 35 ++++++++++++++++++--
1 file changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/c111e629/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index abced3f..828939e 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -19,12 +19,16 @@ package org.apache.gora.tutorial.log;
import java.util.Map;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.gora.persistency.Persistent;
import org.apache.gora.spark.GoraSparkEngine;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
@@ -135,6 +139,8 @@ public class LogAnalyticsSpark {
"Gora Integration Application").setMaster("local");
// todo _fk consider alternative architectural design
+ // todo design inStore and outStore initialization parts as like LogAnalytics.java
+ // todo consider creating job and manipulating it at input part as like LogAnalytics.java
Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);
@@ -143,10 +149,10 @@ public class LogAnalyticsSpark {
Configuration hadoopConf = new Configuration();
- DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
+ DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore(
inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, dataStore);
+ JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
System.out.println("Total Log Count: " + count);
@@ -162,10 +168,35 @@ public class LogAnalyticsSpark {
System.out.println("MetricDatum count:" + reducedGoraRdd.count());
+ //print screen output
Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
for (String key : metricDatumMap.keySet()) {
System.out.println(key);
}
+ //
+
+ //write output to datastore
+ DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore(
+ outStoreClass, String.class, MetricDatum.class, hadoopConf);
+
+ GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+
+ Job job = Job.getInstance(hadoopConf);
+ job.setOutputFormatClass(GoraOutputFormat.class);
+ job.setOutputKeyClass(outStore.getKeyClass());
+ job.setOutputValueClass(outStore.getPersistentClass());
+
+ job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, outStore.getClass(),
+ DataStore.class);
+ job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, outStore.getKeyClass(), Object.class);
+ job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
+ outStore.getPersistentClass(), Persistent.class);
+
+ reducedGoraRdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
+ //
+
+ inStore.close();
+ outStore.close();
return 1;
}
[05/25] gora git commit: * map was not collecting the counts and it
is fixed.
Posted by le...@apache.org.
* map was not collecting the counts and it is fixed.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/445edb12
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/445edb12
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/445edb12
Branch: refs/heads/master
Commit: 445edb12c35f867e7b81919ddfb509dd8c179ba3
Parents: 5644a21
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 02:26:41 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 02:26:41 2015 +0300
----------------------------------------------------------------------
.../apache/gora/tutorial/log/LogAnalyticsSpark.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/445edb12/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 43acba0..0ad3e57 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -40,12 +40,14 @@ public class LogAnalyticsSpark {
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
//todo _fk consider using Kyro serialization
- private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() {
+ private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () {
@Override
- public Tuple2<String, Long> call(Pageview pageview) throws Exception {
- String key = pageview.getUrl().toString();
- Long value = getDay(pageview.getTimestamp());
- return new Tuple2<>(key, value);
+ public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception {
+ String url = pageview.getUrl().toString();
+ Long day = getDay(pageview.getTimestamp());
+ Tuple2<String, Long> keyTuple =new Tuple2<>(url, day);
+
+ return new Tuple2<>(keyTuple, 1L);
}
};
@@ -100,7 +102,7 @@ public class LogAnalyticsSpark {
String firstOneURL = goraRDD.first()._2().getUrl().toString();
System.out.println(firstOneURL);
- JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s);
+ JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s);
return 1;
}
[09/25] gora git commit: * Added dependency to write output to Solr.
Posted by le...@apache.org.
* Added dependency to write output to Solr.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/cf6e7658
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/cf6e7658
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/cf6e7658
Branch: refs/heads/master
Commit: cf6e76585b18ff926f8e7e4928f2d7cd28ac7c6f
Parents: c111e62
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Thu Jul 16 19:38:37 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Thu Jul 16 19:38:37 2015 +0300
----------------------------------------------------------------------
gora-tutorial/pom.xml | 5 +++++
.../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java | 6 +++---
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/cf6e7658/gora-tutorial/pom.xml
----------------------------------------------------------------------
diff --git a/gora-tutorial/pom.xml b/gora-tutorial/pom.xml
index 1a13083..5a775b3 100644
--- a/gora-tutorial/pom.xml
+++ b/gora-tutorial/pom.xml
@@ -108,6 +108,11 @@
<artifactId>gora-cassandra</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-solr</artifactId>
+ </dependency>
+
<!--dependency> <groupId>org.apache.gora</groupId> <artifactId>gora-sql</artifactId>
</dependency> <dependency> <groupId>org.apache.gora</groupId> <artifactId>gora-solr</artifactId>
</dependency -->
http://git-wip-us.apache.org/repos/asf/gora/blob/cf6e7658/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 828939e..372c124 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,8 +17,6 @@
*/
package org.apache.gora.tutorial.log;
-import java.util.Map;
-
import org.apache.gora.mapreduce.GoraMapReduceUtils;
import org.apache.gora.mapreduce.GoraOutputFormat;
import org.apache.gora.persistency.Persistent;
@@ -158,7 +156,7 @@ public class LogAnalyticsSpark {
System.out.println("Total Log Count: " + count);
String firstOneURL = goraRDD.first()._2().getUrl().toString();
- System.out.println(firstOneURL);
+ System.out.println("First entry's first URL:" + firstOneURL);
JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
.values().map(mapFunc);
@@ -169,10 +167,12 @@ public class LogAnalyticsSpark {
System.out.println("MetricDatum count:" + reducedGoraRdd.count());
//print screen output
+ /*
Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
for (String key : metricDatumMap.keySet()) {
System.out.println(key);
}
+ */
//
//write output to datastore
[25/25] gora git commit: GORA-386 Gora Spark Backend Support - Final
update of LogAnalyticsSpark to comply with forbidden-api plugin
Posted by le...@apache.org.
GORA-386 Gora Spark Backend Support - Final update of LogAnalyticsSpark to comply with forbidden-api plugin
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ea44388f
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ea44388f
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ea44388f
Branch: refs/heads/master
Commit: ea44388f9883218f7314ed4bec51e3f2115cf33f
Parents: 7c6a63e
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Thu Sep 3 00:29:30 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Thu Sep 3 00:29:30 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java | 6 +++---
2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/ea44388f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86d78b2..3f7e94d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@
Current Development
+* GORA-386 Gora Spark Backend Support (Furkan KAMACI, talat, lewismc)
+
* GORA-429 Implement Maven forbidden-apis plugin in Gora (lewismc, rmarroquin)
* GORA-228 java.util.ConcurrentModificationException when using MemStore for concurrent tests (Yasin Kılınç, cihad güzel, lewismc)
http://git-wip-us.apache.org/repos/asf/gora/blob/ea44388f/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 84bb693..cdd245c 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -107,7 +107,7 @@ public class LogAnalyticsSpark {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
- System.err.println(USAGE);
+ log.error(USAGE);
System.exit(1);
}
@@ -166,7 +166,7 @@ public class LogAnalyticsSpark {
JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
- System.out.println("Total Log Count: " + count);
+ log.info("Total Log Count: {}", count);
JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
.values().map(mapFunc);
@@ -174,7 +174,7 @@ public class LogAnalyticsSpark {
JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD
.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
- System.out.println("MetricDatum count:" + reducedGoraRdd.count());
+ log.info("MetricDatum count: {}", reducedGoraRdd.count());
//Print output for debug purpose
/*
[18/25] gora git commit: Spark version should be placed at parent pom
and it's done.
Posted by le...@apache.org.
Spark version should be placed at parent pom and it's done.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/888c899f
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/888c899f
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/888c899f
Branch: refs/heads/master
Commit: 888c899f966c27dec1fb173611812f8574737da1
Parents: 8f1acc6
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Thu Aug 27 23:46:34 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Thu Aug 27 23:46:34 2015 +0300
----------------------------------------------------------------------
gora-core/pom.xml | 2 +-
pom.xml | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/888c899f/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index 5f147fb..55312d1 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -145,7 +145,7 @@
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
- <version>1.3.1</version>
+ <version>${spark.version}</version>
</dependency>
<!-- Logging Dependencies -->
http://git-wip-us.apache.org/repos/asf/gora/blob/888c899f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6261ca..307fa1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -644,6 +644,7 @@
<commons-io.version>1.3.2</commons-io.version>
<restlet.version>2.3.1</restlet.version>
+ <spark.version>1.3.1</spark.version>
<!-- Misc Dependencies -->
<guava.version>13.0</guava.version>
<commons-lang.version>2.6</commons-lang.version>
[14/25] gora git commit: Unnecessary imports are removed from
LogAnalyticsSpark.java
Posted by le...@apache.org.
Unnecessary imports are removed from LogAnalyticsSpark.java
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8893fd5c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8893fd5c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8893fd5c
Branch: refs/heads/master
Commit: 8893fd5c6c53b19925c99668d24df23c52e70b1e
Parents: 92b71a6
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 21 15:58:21 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 21 15:58:21 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/8893fd5c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 8c37f8c..59e7c79 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,16 +17,12 @@
*/
package org.apache.gora.tutorial.log;
-import org.apache.gora.mapreduce.GoraMapReduceUtils;
-import org.apache.gora.mapreduce.GoraOutputFormat;
-import org.apache.gora.persistency.Persistent;
import org.apache.gora.spark.GoraSparkEngine;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
[23/25] gora git commit: GORA-386 Gora Spark Backend Support addition
to make forbidden api compliant
Posted by le...@apache.org.
GORA-386 Gora Spark Backend Support addition to make forbidden api compliant
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/2ce2fda5
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/2ce2fda5
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/2ce2fda5
Branch: refs/heads/master
Commit: 2ce2fda5b7f688f1c5fa9e10f7c8ff64ce99be12
Parents: 965c449
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Tue Sep 1 15:26:59 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Tue Sep 1 15:26:59 2015 -0700
----------------------------------------------------------------------
.../apache/gora/examples/spark/SparkWordCount.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/2ce2fda5/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
index 0a84d58..4f91fb2 100644
--- a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
+++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
@@ -17,6 +17,8 @@
*/
package org.apache.gora.examples.spark;
+import java.nio.charset.Charset;
+
import org.apache.gora.examples.generated.TokenDatum;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.spark.GoraSparkEngine;
@@ -52,7 +54,7 @@ public class SparkWordCount {
@Override
public Tuple2<String, Long> call(WebPage webPage)
throws Exception {
- String content = new String(webPage.getContent().array());
+ String content = new String(webPage.getContent().array(), Charset.defaultCharset());
return new Tuple2<>(content, 1L);
}
};
@@ -86,20 +88,20 @@ public class SparkWordCount {
JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
- System.out.println("Total Web page count: " + count);
+ log.info("Total Web page count: {}", count);
JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
//Print output for debug purpose
- System.out.println("SparkWordCount debug purpose TokenDatum print starts:");
+ log.info("SparkWordCount debug purpose TokenDatum print starts:");
Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
for (String key : tokenDatumMap.keySet()) {
- System.out.println(key);
- System.out.println(tokenDatumMap.get(key));
+ log.info(key);
+ log.info(tokenDatumMap.get(key).toString());
}
- System.out.println("SparkWordCount debug purpose TokenDatum print ends:");
+ log.info("SparkWordCount debug purpose TokenDatum print ends:");
//
//write output to datastore
@@ -134,7 +136,7 @@ public class SparkWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
- System.err.println(USAGE);
+ log.info(USAGE);
System.exit(1);
}
[17/25] gora git commit: Minor change at SparkWordCount.java
Posted by le...@apache.org.
Minor change at SparkWordCount.java
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8f1acc6d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8f1acc6d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8f1acc6d
Branch: refs/heads/master
Commit: 8f1acc6d4ef6c192e8fc06287558b7bc7c39b040
Parents: e817a7f
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sat Aug 22 22:06:42 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sat Aug 22 22:06:42 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/examples/spark/SparkWordCount.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/8f1acc6d/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
index 837da13..0a84d58 100644
--- a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
+++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
@@ -86,7 +86,7 @@ public class SparkWordCount {
JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
- System.out.println("Total Log Count: " + count);
+ System.out.println("Total Web page count: " + count);
JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
[07/25] gora git commit: Organizing codes Documentation
Posted by le...@apache.org.
Organizing codes
Documentation
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/80c0c26d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/80c0c26d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/80c0c26d
Branch: refs/heads/master
Commit: 80c0c26d8fb6a9a84ea39f5aa96cc343b4546266
Parents: 81af4d3
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 19:26:28 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 19:26:28 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/spark/GoraSpark.java | 81 -----------------
.../org/apache/gora/spark/GoraSparkEngine.java | 96 ++++++++++++++++++++
.../gora/tutorial/log/LogAnalyticsSpark.java | 40 +++++++-
3 files changed, 131 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
deleted file mode 100644
index 02b5b39..0000000
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gora.spark;
-
-import java.io.IOException;
-
-import org.apache.gora.mapreduce.GoraInputFormat;
-import org.apache.gora.mapreduce.GoraMapReduceUtils;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.store.DataStore;
-import org.apache.gora.util.IOUtils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-/**
- * Base class for Spark integration
- */
-public class GoraSpark<K, V extends Persistent> {
- Class<K> clazzK;
- Class<V> clazzV;
-
- public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
- this.clazzK = clazzK;
- this.clazzV = clazzV;
- }
-
- public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
- Configuration conf, DataStore<K, V> dataStore) {
- GoraMapReduceUtils.setIOSerializations(conf, true);
-
- try {
- IOUtils
- .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
- } catch (IOException ioex) {
- throw new RuntimeException(ioex.getMessage());
- }
-
- return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
- clazzV);
- }
-
- public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
- DataStore<K, V> dataStore) {
- Configuration hadoopConf;
-
- if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
- hadoopConf = ((Configurable) dataStore).getConf();
- } else {
- hadoopConf = new Configuration();
- }
-
- GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-
- try {
- IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
- GoraInputFormat.QUERY_KEY);
- } catch (IOException ioex) {
- throw new RuntimeException(ioex.getMessage());
- }
-
- return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
- clazzK, clazzV);
- }
-}
http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
new file mode 100644
index 0000000..ced44be
--- /dev/null
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.spark;
+
+import java.io.IOException;
+
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Base class for Gora - Spark integration.
+ */
+public class GoraSparkEngine<K, V extends Persistent> {
+ Class<K> clazzK;
+ Class<V> clazzV;
+
+ public GoraSparkEngine(Class<K> clazzK, Class<V> clazzV) {
+ this.clazzK = clazzK;
+ this.clazzV = clazzV;
+ }
+
+ /**
+ * Initializes a {@link JavaPairRDD} from given Spark context, Hadoop
+ * configuration and data store.
+ *
+ * @param sparkContext
+ * Spark context
+ * @param conf
+ * Hadoop configuration
+ * @param dataStore
+ * Data store
+ * @return initialized rdd
+ */
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+ Configuration conf, DataStore<K, V> dataStore) {
+ GoraMapReduceUtils.setIOSerializations(conf, true);
+
+ try {
+ IOUtils
+ .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex.getMessage());
+ }
+
+ return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+ clazzV);
+ }
+
+ /**
+ * Initializes a {@link JavaPairRDD} from given Spark context and data store.
+ * If given data store is {@link Configurable} and has not a configuration
+ * than a Hadoop configuration is created otherwise existed configuration is
+ * used.
+ *
+ * @param sparkContext
+ * Spark context
+ * @param dataStore
+ * Data store
+ * @return initialized rdd
+ */
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+ DataStore<K, V> dataStore) {
+ Configuration hadoopConf;
+
+ if ((dataStore instanceof Configurable)
+ && ((Configurable) dataStore).getConf() != null) {
+ hadoopConf = ((Configurable) dataStore).getConf();
+ } else {
+ hadoopConf = new Configuration();
+ }
+
+ return initialize(sparkContext, hadoopConf, dataStore);
+ }
+}
http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index d69649e..abced3f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,7 +17,9 @@
*/
package org.apache.gora.tutorial.log;
-import org.apache.gora.spark.GoraSpark;
+import java.util.Map;
+
+import org.apache.gora.spark.GoraSparkEngine;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.MetricDatum;
@@ -33,6 +35,21 @@ import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
+/**
+ * LogAnalyticsSpark is the tutorial class to illustrate Gora Spark API. The
+ * Spark job reads the web access data stored earlier by the {@link LogManager},
+ * and calculates the aggregate daily pageviews. The output of the job is stored
+ * in a Gora compatible data store.
+ *
+ * This class illustrates the same functionality with {@link LogAnalytics} via
+ * Spark.
+ *
+ * <p>
+ * See the tutorial.html file in docs or go to the <a
+ * href="http://incubator.apache.org/gora/docs/current/tutorial.html"> web
+ * site</a>for more information.
+ * </p>
+ */
public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -41,6 +58,9 @@ public class LogAnalyticsSpark {
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
// todo _fk consider using Kyro serialization
+ /**
+ * map function used in calculation
+ */
private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
@Override
public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
@@ -53,6 +73,9 @@ public class LogAnalyticsSpark {
}
};
+ /**
+ * reduce function used in calculation
+ */
private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) throws Exception {
@@ -60,6 +83,9 @@ public class LogAnalyticsSpark {
}
};
+ /**
+ * metric function used after map phase
+ */
private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
@Override
public Tuple2<String, MetricDatum> call(
@@ -102,7 +128,7 @@ public class LogAnalyticsSpark {
}
public int run(String inStoreClass, String outStoreClass) throws Exception {
- GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+ GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
@@ -120,11 +146,10 @@ public class LogAnalyticsSpark {
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
- dataStore);
+ JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, dataStore);
long count = goraRDD.count();
- System.out.println("Total Count: " + count);
+ System.out.println("Total Log Count: " + count);
String firstOneURL = goraRDD.first()._2().getUrl().toString();
System.out.println(firstOneURL);
@@ -137,6 +162,11 @@ public class LogAnalyticsSpark {
System.out.println("MetricDatum count:" + reducedGoraRdd.count());
+ Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
+ for (String key : metricDatumMap.keySet()) {
+ System.out.println(key);
+ }
+
return 1;
}
}
[19/25] gora git commit: servlet dependency is excluded from spark,
not ant. Without excluding it throws a java security exception.
Motivation for it: https://issues.apache.org/jira/browse/SPARK-1693 and
Posted by le...@apache.org.
servlet dependency is excluded from spark, not ant. Without excluding it throws a java security exception. Motivation for it: https://issues.apache.org/jira/browse/SPARK-1693 and
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ad4f5601
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ad4f5601
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ad4f5601
Branch: refs/heads/master
Commit: ad4f5601ebd52e182bf79ba4032dcb85d918b925
Parents: 888c899
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 28 00:15:42 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 28 00:15:42 2015 +0300
----------------------------------------------------------------------
gora-core/pom.xml | 6 ++++++
pom.xml | 4 ----
2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/ad4f5601/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index 55312d1..16fe808 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -146,6 +146,12 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.eclipse.jetty.orbit</groupId>
+ <artifactId>javax.servlet</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- Logging Dependencies -->
http://git-wip-us.apache.org/repos/asf/gora/blob/ad4f5601/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 307fa1d..cb55db4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -894,10 +894,6 @@
<groupId>ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
</exclusions>
</dependency>
[12/25] gora git commit: GoraSparkEngine.java setOutput method
renamed to generateOutputConf. Added a new method to GoraSparkEngine.java
which creates a job and returns necessary conf for Spark to use.
Posted by le...@apache.org.
GoraSparkEngine.java setOutput method renamed to generateOutputConf.
Added a new method to GoraSparkEngine.java which creates a job and returns necessary conf for Spark to use.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/85849111
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/85849111
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/85849111
Branch: refs/heads/master
Commit: 8584911176712f63e316707ace23a12060588cdc
Parents: 62be0c3
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 23:25:15 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 23:25:15 2015 +0300
----------------------------------------------------------------------
.../org/apache/gora/spark/GoraSparkEngine.java | 28 +++++++++++++++-----
.../gora/tutorial/log/LogAnalyticsSpark.java | 5 +---
2 files changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/85849111/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index 9c64542..7a819fd 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -97,19 +97,35 @@ public class GoraSparkEngine<K, V extends Persistent> {
}
/**
- * Sets the output parameters for the job
+ * Creates a job and sets the output parameters for the conf that Spark will use
+ * @param dataStore the datastore as the output
+ * @param reuseObjects whether to reuse objects in serialization
+ */
+ public <K, V extends Persistent> Configuration generateOutputConf(DataStore<K, V> dataStore,
+ boolean reuseObjects) throws IOException {
+
+ Configuration hadoopConf = new Configuration();
+ GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+ Job job = Job.getInstance(hadoopConf);
+
+ return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
+ dataStore.getPersistentClass(), reuseObjects);
+ }
+
+ /**
+ * Sets the output parameters for the conf that Spark will use
* @param job the job to set the properties for
* @param dataStore the datastore as the output
* @param reuseObjects whether to reuse objects in serialization
*/
- public <K, V extends Persistent> Configuration setOutput(Job job,
+ public <K, V extends Persistent> Configuration generateOutputConf(Job job,
DataStore<K, V> dataStore, boolean reuseObjects) {
- return setOutput(job, dataStore.getClass(), dataStore.getKeyClass(),
- dataStore.getPersistentClass(), reuseObjects);
+ return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
+ dataStore.getPersistentClass(), reuseObjects);
}
/**
- * Sets the output parameters for the job
+ * Sets the output parameters for the conf that Spark will use
*
* @param job the job to set the properties for
* @param dataStoreClass the datastore class
@@ -118,7 +134,7 @@ public class GoraSparkEngine<K, V extends Persistent> {
* @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings("rawtypes")
- public <K, V extends Persistent> Configuration setOutput(Job job,
+ public <K, V extends Persistent> Configuration generateOutputConf(Job job,
Class<? extends DataStore> dataStoreClass,
Class<K> keyClass, Class<V> persistentClass,
boolean reuseObjects) {
http://git-wip-us.apache.org/repos/asf/gora/blob/85849111/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index d441de7..c66f7e4 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -190,10 +190,7 @@ public class LogAnalyticsSpark {
//
//write output to datastore
- GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
- Job job = Job.getInstance(hadoopConf);
-
- Configuration sparkHadoopConf = goraSparkEngine.setOutput(job, outStore, true);
+ Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore, true);
reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
//
[11/25] gora git commit: * GoraSparkEngine.java architecture is
improved.
Posted by le...@apache.org.
* GoraSparkEngine.java architecture is improved.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/62be0c31
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/62be0c31
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/62be0c31
Branch: refs/heads/master
Commit: 62be0c312927e3ea4962eb966689b49dc1fcebce
Parents: 2195570
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 22:25:12 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 22:25:12 2015 +0300
----------------------------------------------------------------------
.../org/apache/gora/spark/GoraSparkEngine.java | 41 ++++++++++++++++++++
.../gora/tutorial/log/LogAnalyticsSpark.java | 19 ++-------
2 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index ced44be..9c64542 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -21,11 +21,13 @@ import java.io.IOException;
import org.apache.gora.mapreduce.GoraInputFormat;
import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraOutputFormat;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
import org.apache.gora.util.IOUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -93,4 +95,43 @@ public class GoraSparkEngine<K, V extends Persistent> {
return initialize(sparkContext, hadoopConf, dataStore);
}
+
+ /**
+ * Sets the output parameters for the job
+ * @param job the job to set the properties for
+ * @param dataStore the datastore as the output
+ * @param reuseObjects whether to reuse objects in serialization
+ */
+ public <K, V extends Persistent> Configuration setOutput(Job job,
+ DataStore<K, V> dataStore, boolean reuseObjects) {
+ return setOutput(job, dataStore.getClass(), dataStore.getKeyClass(),
+ dataStore.getPersistentClass(), reuseObjects);
+ }
+
+ /**
+ * Sets the output parameters for the job
+ *
+ * @param job the job to set the properties for
+ * @param dataStoreClass the datastore class
+ * @param keyClass output key class
+ * @param persistentClass output value class
+ * @param reuseObjects whether to reuse objects in serialization
+ */
+ @SuppressWarnings("rawtypes")
+ public <K, V extends Persistent> Configuration setOutput(Job job,
+ Class<? extends DataStore> dataStoreClass,
+ Class<K> keyClass, Class<V> persistentClass,
+ boolean reuseObjects) {
+
+ job.setOutputFormatClass(GoraOutputFormat.class);
+ job.setOutputKeyClass(keyClass);
+ job.setOutputValueClass(persistentClass);
+
+ job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, dataStoreClass,
+ DataStore.class);
+ job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, keyClass, Object.class);
+ job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
+ persistentClass, Persistent.class);
+ return job.getConfiguration();
+ }
}
http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 327bf7f..d441de7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -63,7 +63,6 @@ public class LogAnalyticsSpark {
/** The number of milliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
- // todo _fk consider using Kyro serialization
/**
* map function used in calculation
*/
@@ -173,9 +172,6 @@ public class LogAnalyticsSpark {
long count = goraRDD.count();
System.out.println("Total Log Count: " + count);
- String firstOneURL = goraRDD.first()._2().getUrl().toString();
- System.out.println("First entry's first URL:" + firstOneURL);
-
JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
.values().map(mapFunc);
@@ -184,7 +180,7 @@ public class LogAnalyticsSpark {
System.out.println("MetricDatum count:" + reducedGoraRdd.count());
- //print screen output
+ //Print output for debug purpose
/*
Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
for (String key : metricDatumMap.keySet()) {
@@ -195,19 +191,10 @@ public class LogAnalyticsSpark {
//write output to datastore
GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-
Job job = Job.getInstance(hadoopConf);
- job.setOutputFormatClass(GoraOutputFormat.class);
- job.setOutputKeyClass(outStore.getKeyClass());
- job.setOutputValueClass(outStore.getPersistentClass());
-
- job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, outStore.getClass(),
- DataStore.class);
- job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, outStore.getKeyClass(), Object.class);
- job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
- outStore.getPersistentClass(), Persistent.class);
- reducedGoraRdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
+ Configuration sparkHadoopConf = goraSparkEngine.setOutput(job, outStore, true);
+ reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
//
inStore.close();
[15/25] gora git commit: Minor improvements for LogAnalyticsSpark.java
Posted by le...@apache.org.
Minor improvements for LogAnalyticsSpark.java
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/9cff3359
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/9cff3359
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/9cff3359
Branch: refs/heads/master
Commit: 9cff33598231e9cf54060cecf1ccacdf62e64c5b
Parents: 8893fd5
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sat Aug 22 16:46:55 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sat Aug 22 16:46:55 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/9cff3359/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 59e7c79..84bb693 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -155,10 +155,10 @@ public class LogAnalyticsSpark {
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
- "Gora Integration Application").setMaster("local");
+ "Gora Spark Integration Application").setMaster("local");
Class[] c = new Class[1];
- c[0] = Pageview.class;
+ c[0] = inStore.getPersistentClass();
sparkConf.registerKryoClasses(c);
//
JavaSparkContext sc = new JavaSparkContext(sparkConf);
[22/25] gora git commit: Merge branch 'master' of
https://github.com/kamaci/gora into GORA-386
Posted by le...@apache.org.
Merge branch 'master' of https://github.com/kamaci/gora into GORA-386
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/965c4499
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/965c4499
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/965c4499
Branch: refs/heads/master
Commit: 965c44999d0afde2b6379b035c568d8539b0c384
Parents: a303ff7 3cf5502
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Tue Sep 1 13:49:50 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Tue Sep 1 13:49:50 2015 -0700
----------------------------------------------------------------------
bin/gora | 5 +
gora-core/pom.xml | 13 ++
.../gora/examples/spark/SparkWordCount.java | 150 ++++++++++++++
.../org/apache/gora/spark/GoraSparkEngine.java | 150 ++++++++++++++
.../gora/mapreduce/MapReduceTestUtils.java | 31 +++
.../mapreduce/TestHBaseStoreWordCount.java | 8 +
.../mapreduce/TestMongoStoreWordCount.java | 9 +
gora-tutorial/pom.xml | 5 +
.../gora/tutorial/log/LogAnalyticsSpark.java | 200 +++++++++++++++++++
pom.xml | 1 +
10 files changed, 572 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/965c4499/gora-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/965c4499/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index e8ed87d,2b5821c..8197371
--- a/pom.xml
+++ b/pom.xml
@@@ -669,9 -641,10 +669,10 @@@
<jetty.version>8.1.8.v20121106</jetty.version>
<tika.version>1.7</tika.version>
<httpcomponents.version>4.3.1</httpcomponents.version>
- <commons-io.version>1.3.2</commons-io.version>
+ <commons-io.version>2.4</commons-io.version>
<restlet.version>2.3.1</restlet.version>
+ <spark.version>1.4.1</spark.version>
<!-- Misc Dependencies -->
<guava.version>13.0</guava.version>
<commons-lang.version>2.6</commons-lang.version>
[10/25] gora git commit: Code is organized at LogAnalyticsSpark.java.
Posted by le...@apache.org.
Code is organized at LogAnalyticsSpark.java.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/21955700
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/21955700
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/21955700
Branch: refs/heads/master
Commit: 219557002f0e33ef6d0e2bb49471d24fb867b0ac
Parents: cf6e765
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 21:21:43 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 21:21:43 2015 +0300
----------------------------------------------------------------------
.../gora/tutorial/log/LogAnalyticsSpark.java | 51 +++++++++++++-------
1 file changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/21955700/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 372c124..327bf7f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -35,6 +35,8 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -54,6 +56,8 @@ import scala.Tuple2;
*/
public class LogAnalyticsSpark {
+ private static final Logger log = LoggerFactory.getLogger(LogAnalyticsSpark.class);
+
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
/** The number of milliseconds in a day */
@@ -112,13 +116,15 @@ public class LogAnalyticsSpark {
System.exit(1);
}
- String inStoreClass = args[0];
- String outStoreClass = args[1];
-
LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
- int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
- System.exit(ret);
+ try {
+ int ret = logAnalyticsSpark.run(args);
+ System.exit(ret);
+ } catch (Exception ex){
+ log.error("Error occurred!");
+ }
+
}
/**
@@ -129,27 +135,39 @@ public class LogAnalyticsSpark {
return (timeStamp / DAY_MILIS) * DAY_MILIS;
}
- public int run(String inStoreClass, String outStoreClass) throws Exception {
+ public int run(String[] args) throws Exception {
+
+ DataStore<Long, Pageview> inStore;
+ DataStore<String, MetricDatum> outStore;
+ Configuration hadoopConf = new Configuration();
+
+ if (args.length > 0) {
+ String dataStoreClass = args[0];
+ inStore = DataStoreFactory.getDataStore(
+ dataStoreClass, Long.class, Pageview.class, hadoopConf);
+ if (args.length > 1) {
+ dataStoreClass = args[1];
+ }
+ outStore = DataStoreFactory.getDataStore(
+ dataStoreClass, String.class, MetricDatum.class, hadoopConf);
+ } else {
+ inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
+ outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);
+ }
+
+ //Spark engine initialization
GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
- // todo _fk consider alternative architectural design
- // todo design inStore and outStore initialization parts as like LogAnalytics.java
- // todo consider creating job and manipulating it at input part as like LogAnalytics.java
Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);
//
JavaSparkContext sc = new JavaSparkContext(sparkConf);
- Configuration hadoopConf = new Configuration();
-
- DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore(
- inStoreClass, Long.class, Pageview.class, hadoopConf);
-
JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
long count = goraRDD.count();
@@ -176,9 +194,6 @@ public class LogAnalyticsSpark {
//
//write output to datastore
- DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore(
- outStoreClass, String.class, MetricDatum.class, hadoopConf);
-
GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
Job job = Job.getInstance(hadoopConf);
@@ -198,6 +213,8 @@ public class LogAnalyticsSpark {
inStore.close();
outStore.close();
+ log.info("Log completed with success");
+
return 1;
}
}
[24/25] gora git commit: Merge branch 'GORA-386'
Posted by le...@apache.org.
Merge branch 'GORA-386'
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/7c6a63e0
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/7c6a63e0
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/7c6a63e0
Branch: refs/heads/master
Commit: 7c6a63e0dfbbc223353bab64cc6218b03bbff526
Parents: 1302573 2ce2fda
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Tue Sep 1 15:27:17 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Tue Sep 1 15:27:17 2015 -0700
----------------------------------------------------------------------
bin/gora | 5 +
gora-core/pom.xml | 13 ++
.../gora/examples/spark/SparkWordCount.java | 152 ++++++++++++++
.../org/apache/gora/spark/GoraSparkEngine.java | 150 ++++++++++++++
.../gora/mapreduce/MapReduceTestUtils.java | 31 +++
.../mapreduce/TestHBaseStoreWordCount.java | 8 +
.../mapreduce/TestMongoStoreWordCount.java | 9 +
gora-tutorial/pom.xml | 5 +
.../gora/tutorial/log/LogAnalyticsSpark.java | 200 +++++++++++++++++++
pom.xml | 1 +
10 files changed, 574 insertions(+)
----------------------------------------------------------------------
[06/25] gora git commit: * GoraSpark.java initializeInput method
renamed to initialize * reduce part is added to example.
Posted by le...@apache.org.
* GoraSpark.java initializeInput method renamed to initialize
* reduce part is added to example.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/81af4d3a
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/81af4d3a
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/81af4d3a
Branch: refs/heads/master
Commit: 81af4d3afcba4633d0c5d06ead9b4256ea60862f
Parents: 445edb1
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 18:33:38 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 18:33:38 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/spark/GoraSpark.java | 4 +-
.../gora/tutorial/log/LogAnalyticsSpark.java | 49 ++++++++++++++++----
2 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 4279cfb..02b5b39 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -41,7 +41,7 @@ public class GoraSpark<K, V extends Persistent> {
this.clazzV = clazzV;
}
- public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
Configuration conf, DataStore<K, V> dataStore) {
GoraMapReduceUtils.setIOSerializations(conf, true);
@@ -56,7 +56,7 @@ public class GoraSpark<K, V extends Persistent> {
clazzV);
}
- public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+ public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
DataStore<K, V> dataStore) {
Configuration hadoopConf;
http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0ad3e57..d69649e 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -20,6 +20,7 @@ package org.apache.gora.tutorial.log;
import org.apache.gora.spark.GoraSpark;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.MetricDatum;
import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
@@ -27,11 +28,11 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
-import java.util.concurrent.TimeUnit;
-
public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -39,18 +40,44 @@ public class LogAnalyticsSpark {
/** The number of milliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
- //todo _fk consider using Kyro serialization
- private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () {
+ // todo _fk consider using Kyro serialization
+ private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
@Override
- public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception {
+ public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
+ throws Exception {
String url = pageview.getUrl().toString();
Long day = getDay(pageview.getTimestamp());
- Tuple2<String, Long> keyTuple =new Tuple2<>(url, day);
+ Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
return new Tuple2<>(keyTuple, 1L);
}
};
+ private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
+ @Override
+ public Long call(Long aLong, Long aLong2) throws Exception {
+ return aLong + aLong2;
+ }
+ };
+
+ private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
+ @Override
+ public Tuple2<String, MetricDatum> call(
+ Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
+ String dimension = tuple2LongTuple2._1()._1();
+ long timestamp = tuple2LongTuple2._1()._2();
+
+ MetricDatum metricDatum = new MetricDatum();
+ metricDatum.setMetricDimension(dimension);
+ metricDatum.setTimestamp(timestamp);
+
+ String key = metricDatum.getMetricDimension().toString();
+ key += "_" + Long.toString(timestamp);
+ metricDatum.setMetric(tuple2LongTuple2._2());
+ return new Tuple2<>(key, metricDatum);
+ }
+ };
+
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println(USAGE);
@@ -93,7 +120,7 @@ public class LogAnalyticsSpark {
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+ JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
dataStore);
long count = goraRDD.count();
@@ -102,7 +129,13 @@ public class LogAnalyticsSpark {
String firstOneURL = goraRDD.first()._2().getUrl().toString();
System.out.println(firstOneURL);
- JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s);
+ JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
+ .values().map(mapFunc);
+
+ JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD
+ .fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
+
+ System.out.println("MetricDatum count:" + reducedGoraRdd.count());
return 1;
}
[04/25] gora git commit: * map function is implemented as like in
LogAnalytics.java
Posted by le...@apache.org.
* map function is implemented as like in LogAnalytics.java
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5644a21c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5644a21c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5644a21c
Branch: refs/heads/master
Commit: 5644a21c5c5678611c7cef4e2c922643088951b8
Parents: 8fbdef7
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 02:10:26 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 02:10:26 2015 +0300
----------------------------------------------------------------------
.../gora/tutorial/log/LogAnalyticsSpark.java | 38 +++++++++++++++++---
1 file changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/5644a21c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0194c3b..43acba0 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -24,12 +24,31 @@ import org.apache.gora.tutorial.log.generated.Pageview;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import java.util.concurrent.TimeUnit;
public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
+ /** The number of milliseconds in a day */
+ private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+
+ //todo _fk consider using Kyro serialization
+ private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() {
+ @Override
+ public Tuple2<String, Long> call(Pageview pageview) throws Exception {
+ String key = pageview.getUrl().toString();
+ Long value = getDay(pageview.getTimestamp());
+ return new Tuple2<>(key, value);
+ }
+ };
+
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println(USAGE);
@@ -45,14 +64,22 @@ public class LogAnalyticsSpark {
System.exit(ret);
}
+ /**
+ * Rolls up the given timestamp to the day cardinality, so that data can be
+ * aggregated daily
+ */
+ private static long getDay(long timeStamp) {
+ return (timeStamp / DAY_MILIS) * DAY_MILIS;
+ }
+
public int run(String inStoreClass, String outStoreClass) throws Exception {
GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
Pageview.class);
SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
-
- //todo _fk change architectural desigm
+
+ // todo _fk consider alternative architectural design
Class[] c = new Class[1];
c[0] = Pageview.class;
sparkConf.registerKryoClasses(c);
@@ -64,9 +91,8 @@ public class LogAnalyticsSpark {
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
- // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
- // cachedGoraRdd = goraRDD.cache();
+ JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+ dataStore);
long count = goraRDD.count();
System.out.println("Total Count: " + count);
@@ -74,6 +100,8 @@ public class LogAnalyticsSpark {
String firstOneURL = goraRDD.first()._2().getUrl().toString();
System.out.println(firstOneURL);
+ JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s);
+
return 1;
}
}
[02/25] gora git commit: * GoraSpark.java initialize method renamed
to initializeInput. * Architectural change is made.
Posted by le...@apache.org.
* GoraSpark.java initialize method renamed to initializeInput.
* Architectural change is made.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ef68cead
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ef68cead
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ef68cead
Branch: refs/heads/master
Commit: ef68cead273324797cf292dbe6da18ee3fd819cb
Parents: 9c2d225
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 17:17:52 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 17:17:52 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/gora/spark/GoraSpark.java | 50 +++++++++++++++-----
.../gora/tutorial/log/LogAnalyticsSpark.java | 44 ++++++++---------
2 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 690e32c..4279cfb 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -24,6 +24,7 @@ import org.apache.gora.mapreduce.GoraMapReduceUtils;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -37,19 +38,44 @@ public class GoraSpark<K, V extends Persistent> {
public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
this.clazzK = clazzK;
- this.clazzV = clazzV;
+ this.clazzV = clazzV;
}
- public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+ public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
Configuration conf, DataStore<K, V> dataStore) {
- GoraMapReduceUtils.setIOSerializations(conf, true);
- try {
- IOUtils.storeToConf(dataStore.newQuery(), conf,
- GoraInputFormat.QUERY_KEY);
- } catch (IOException ioex) {
- throw new RuntimeException(ioex.getMessage());
- }
- return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
- clazzK, clazzV);
- }
+ GoraMapReduceUtils.setIOSerializations(conf, true);
+
+ try {
+ IOUtils
+ .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex.getMessage());
+ }
+
+ return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+ clazzV);
+ }
+
+ public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+ DataStore<K, V> dataStore) {
+ Configuration hadoopConf;
+
+ if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
+ hadoopConf = ((Configurable) dataStore).getConf();
+ } else {
+ hadoopConf = new Configuration();
+ }
+
+ GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+
+ try {
+ IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
+ GoraInputFormat.QUERY_KEY);
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex.getMessage());
+ }
+
+ return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
+ clazzK, clazzV);
+ }
}
http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 214b130..9b28fd7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -21,44 +21,44 @@ import org.apache.gora.spark.GoraSpark;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.tutorial.log.generated.Pageview;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-public class LogAnalyticsSpark extends Configured implements Tool {
+public class LogAnalyticsSpark {
private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
- private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println(USAGE);
- System.exit(1);
+ if (args.length < 2) {
+ System.err.println(USAGE);
+ System.exit(1);
}
- // run as any other MR job
- int ret = ToolRunner.run(logAnalyticsSpark, args);
- System.exit(ret);
+
+ String inStoreClass = args[0];
+ String outStoreClass = args[1];
+
+ LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+ int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
+
+ System.exit(ret);
}
- @Override
- public int run(String[] args) throws Exception {
- GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
- Long.class, Pageview.class);
+ public int run(String inStoreClass, String outStoreClass) throws Exception {
+ GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+ Pageview.class);
- SparkConf conf = new SparkConf().setAppName(
+ SparkConf sparkConf = new SparkConf().setAppName(
"Gora Integration Application").setMaster("local");
- JavaSparkContext sc = new JavaSparkContext(conf);
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+ Configuration hadoopConf = new Configuration();
- String dataStoreClass = args[0];
DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
- dataStoreClass, Long.class, Pageview.class,
- logAnalyticsSpark.getConf());
+ inStoreClass, Long.class, Pageview.class, hadoopConf);
- JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
- .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+ JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
// JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
// cachedGoraRdd = goraRDD.cache();
[21/25] gora git commit: Spark version is changed to 1.4.1 from 1.3.1.
Posted by le...@apache.org.
Spark version is changed to 1.4.1 from 1.3.1.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/3cf55026
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/3cf55026
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/3cf55026
Branch: refs/heads/master
Commit: 3cf550264d782bcffa15fdf62c584eca355cab29
Parents: 6252edd
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 28 00:57:26 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 28 00:57:26 2015 +0300
----------------------------------------------------------------------
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/3cf55026/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb55db4..2b5821c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -644,7 +644,7 @@
<commons-io.version>1.3.2</commons-io.version>
<restlet.version>2.3.1</restlet.version>
- <spark.version>1.3.1</spark.version>
+ <spark.version>1.4.1</spark.version>
<!-- Misc Dependencies -->
<guava.version>13.0</guava.version>
<commons-lang.version>2.6</commons-lang.version>
[20/25] gora git commit: LogAnalyticsSpark is let to run from command
line by passing "loganalyticsspark" argument.
Posted by le...@apache.org.
LogAnalyticsSpark is let to run from command line by passing "loganalyticsspark" argument.
Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/6252eddf
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/6252eddf
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/6252eddf
Branch: refs/heads/master
Commit: 6252eddfecf5a9cba028e1d7529b6dee3ec5ef33
Parents: ad4f560
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 28 00:21:31 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 28 00:21:31 2015 +0300
----------------------------------------------------------------------
bin/gora | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/gora/blob/6252eddf/bin/gora
----------------------------------------------------------------------
diff --git a/bin/gora b/bin/gora
index 7126972..8c37272 100755
--- a/bin/gora
+++ b/bin/gora
@@ -49,6 +49,7 @@ if [ $# = 0 ]; then
echo " goracisetup Run the GoraCI Rackspace orchestration setup"
echo " logmanager Run the tutorial log manager"
echo " loganalytics Run the tutorial log analytics"
+ echo " loganalyticsspark Run the tutorial log analytics spark"
echo " junit Run the given JUnit test"
echo " version Print Gora version to terminal"
echo " or"
@@ -134,6 +135,10 @@ elif [ "$COMMAND" = "loganalytics" ] ; then
MODULE=gora-tutorial
CLASS=org.apache.gora.tutorial.log.LogAnalytics
CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
+elif [ "$COMMAND" = "loganalyticsspark" ] ; then
+ MODULE=gora-tutorial
+ CLASS=org.apache.gora.tutorial.log.LogAnalyticsSpark
+ CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
elif [ "$COMMAND" = "junit" ] ; then
MODULE=*
CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/test-classes/