You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:37 UTC

[01/25] gora git commit: * JavaPairRDD support for GoraInputFormat.

Repository: gora
Updated Branches:
  refs/heads/master 130257370 -> ea44388f9


* JavaPairRDD support for GoraInputFormat.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/9c2d225d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/9c2d225d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/9c2d225d

Branch: refs/heads/master
Commit: 9c2d225d04cfa746244373fa661a1aa6f03250bb
Parents: bb09d89
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 01:57:51 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 01:57:51 2015 +0300

----------------------------------------------------------------------
 gora-core/pom.xml                               |  7 ++
 .../java/org/apache/gora/spark/GoraSpark.java   | 55 ++++++++++++++++
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 69 ++++++++++++++++++++
 pom.xml                                         |  4 ++
 4 files changed, 135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index eab5330..5f147fb 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -141,6 +141,13 @@
       <artifactId>guava</artifactId>
     </dependency>
 
+    <!-- Spark dependency -->
+    <dependency> <!-- Spark dependency -->
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.10</artifactId>
+      <version>1.3.1</version>
+    </dependency>
+
     <!-- Logging Dependencies -->
     <dependency>
       <groupId>log4j</groupId>

http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
new file mode 100644
index 0000000..690e32c
--- /dev/null
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.spark;
+
+import java.io.IOException;
+
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Base class for Spark integration
+ */
+public class GoraSpark<K, V extends Persistent> {
+  Class<K> clazzK;
+  Class<V> clazzV;
+
+  public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
+    this.clazzK = clazzK;
+	this.clazzV = clazzV;
+  }
+
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+      Configuration conf, DataStore<K, V> dataStore) {
+	  GoraMapReduceUtils.setIOSerializations(conf, true);
+      try {
+	    IOUtils.storeToConf(dataStore.newQuery(), conf,
+		    GoraInputFormat.QUERY_KEY);
+      } catch (IOException ioex) {
+        throw new RuntimeException(ioex.getMessage());
+	  }
+       return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
+	     clazzK, clazzV);
+	}
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
new file mode 100644
index 0000000..214b130
--- /dev/null
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.tutorial.log;
+
+import org.apache.gora.spark.GoraSpark;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.Pageview;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class LogAnalyticsSpark extends Configured implements Tool {
+
+  private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
+  private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+
+  public static void main(String[] args) throws Exception {
+      if (args.length < 2) {
+        System.err.println(USAGE);
+        System.exit(1);
+    }
+      // run as any other MR job
+      int ret = ToolRunner.run(logAnalyticsSpark, args);
+      System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
+      Long.class, Pageview.class);
+
+    SparkConf conf = new SparkConf().setAppName(
+        "Gora Integration Application").setMaster("local");
+    JavaSparkContext sc = new JavaSparkContext(conf);
+
+    String dataStoreClass = args[0];
+    DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
+        dataStoreClass, Long.class, Pageview.class,
+    logAnalyticsSpark.getConf());
+
+    JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
+        .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+    // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
+    // cachedGoraRdd = goraRDD.cache();
+
+    long count = goraRDD.count();
+    System.out.println("Total Count: " + count);
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/9c2d225d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 01a7fee..a6261ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -893,6 +893,10 @@
             <groupId>ant</groupId>
             <artifactId>ant</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>org.mortbay.jetty</groupId>
+            <artifactId>servlet-api</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
 


[03/25] gora git commit: * Serialization support via Kyro.

Posted by le...@apache.org.
* Serialization support via Kyro.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8fbdef7d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8fbdef7d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8fbdef7d

Branch: refs/heads/master
Commit: 8fbdef7de963defb1387f0fbbe9e56506b627ba3
Parents: ef68cea
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 18:49:13 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 18:49:13 2015 +0300

----------------------------------------------------------------------
 .../org/apache/gora/tutorial/log/LogAnalyticsSpark.java   | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/8fbdef7d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 9b28fd7..0194c3b 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -51,6 +51,12 @@ public class LogAnalyticsSpark {
 
     SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
+      
+    //todo _fk change architectural desigm
+    Class[] c = new Class[1];
+    c[0] = Pageview.class;
+    sparkConf.registerKryoClasses(c);
+    //
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
     Configuration hadoopConf = new Configuration();
@@ -64,6 +70,10 @@ public class LogAnalyticsSpark {
 
     long count = goraRDD.count();
     System.out.println("Total Count: " + count);
+
+    String firstOneURL = goraRDD.first()._2().getUrl().toString();
+    System.out.println(firstOneURL);
+
     return 1;
   }
 }


[13/25] gora git commit: Unnecessary reuseObjects is removed.

Posted by le...@apache.org.
Unnecessary reuseObjects is removed.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/92b71a6d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/92b71a6d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/92b71a6d

Branch: refs/heads/master
Commit: 92b71a6d2c95492621a56e48470e951e982cc34f
Parents: 8584911
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 23:34:40 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 23:34:40 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSparkEngine.java    | 13 +++++--------
 .../apache/gora/tutorial/log/LogAnalyticsSpark.java    |  2 +-
 2 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/92b71a6d/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index 7a819fd..98dafea 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -99,17 +99,16 @@ public class GoraSparkEngine<K, V extends Persistent> {
     /**
      * Creates a job and sets the output parameters for the conf that Spark will use
      * @param dataStore the datastore as the output
-     * @param reuseObjects whether to reuse objects in serialization
      */
-    public <K, V extends Persistent> Configuration generateOutputConf(DataStore<K, V> dataStore,
-        boolean reuseObjects) throws IOException {
+    public <K, V extends Persistent> Configuration generateOutputConf(DataStore<K, V> dataStore)
+       throws IOException {
 
       Configuration hadoopConf = new Configuration();
       GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
       Job job = Job.getInstance(hadoopConf);
 
       return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
-           dataStore.getPersistentClass(), reuseObjects);
+           dataStore.getPersistentClass());
     }
 
     /**
@@ -121,7 +120,7 @@ public class GoraSparkEngine<K, V extends Persistent> {
     public <K, V extends Persistent> Configuration generateOutputConf(Job job,
         DataStore<K, V> dataStore, boolean reuseObjects) {
       return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
-              dataStore.getPersistentClass(), reuseObjects);
+              dataStore.getPersistentClass());
     }
 
     /**
@@ -131,13 +130,11 @@ public class GoraSparkEngine<K, V extends Persistent> {
      * @param dataStoreClass  the datastore class
      * @param keyClass        output key class
      * @param persistentClass output value class
-     * @param reuseObjects    whether to reuse objects in serialization
      */
     @SuppressWarnings("rawtypes")
     public <K, V extends Persistent> Configuration generateOutputConf(Job job,
         Class<? extends DataStore> dataStoreClass,
-        Class<K> keyClass, Class<V> persistentClass,
-        boolean reuseObjects) {
+        Class<K> keyClass, Class<V> persistentClass) {
 
       job.setOutputFormatClass(GoraOutputFormat.class);
       job.setOutputKeyClass(keyClass);

http://git-wip-us.apache.org/repos/asf/gora/blob/92b71a6d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index c66f7e4..8c37f8c 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -190,7 +190,7 @@ public class LogAnalyticsSpark {
     //
 
     //write output to datastore
-    Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore, true);
+    Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
     reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
     //
 


[16/25] gora git commit: Spark engine word count tests are implemented. Due to a connection problem, spark engine word count tests are ignored.

Posted by le...@apache.org.
Spark engine word count tests are implemented.
Due to a connection problem, spark engine word count tests are ignored.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/e817a7f0
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/e817a7f0
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/e817a7f0

Branch: refs/heads/master
Commit: e817a7f0eeb2efb98937efb92701705ec80891b9
Parents: 9cff335
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sat Aug 22 17:31:57 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sat Aug 22 17:31:57 2015 +0300

----------------------------------------------------------------------
 .../gora/examples/spark/SparkWordCount.java     | 150 +++++++++++++++++++
 .../gora/mapreduce/MapReduceTestUtils.java      |  31 ++++
 .../mapreduce/TestHBaseStoreWordCount.java      |   8 +
 .../mapreduce/TestMongoStoreWordCount.java      |   9 ++
 4 files changed, 198 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
new file mode 100644
index 0000000..837da13
--- /dev/null
+++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.examples.spark;
+
+import org.apache.gora.examples.generated.TokenDatum;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.spark.GoraSparkEngine;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Classic word count example in Gora with Spark.
+ */
+public class SparkWordCount {
+  private static final Logger log = LoggerFactory.getLogger(SparkWordCount.class);
+
+  private static final String USAGE = "SparkWordCount <input_data_store> <output_data_store>";
+
+  /**
+   * map function used in calculation
+   */
+  private static Function<WebPage, Tuple2<String, Long>> mapFunc =
+    new Function<WebPage, Tuple2<String, Long>>() {
+        @Override
+        public Tuple2<String, Long> call(WebPage webPage)
+                throws Exception {
+          String content = new String(webPage.getContent().array());
+          return new Tuple2<>(content, 1L);
+        }
+  };
+
+  /**
+   * reduce function used in calculation
+   */
+  private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
+    @Override
+    public Long call(Long aLong, Long aLong2) throws Exception {
+      return aLong + aLong2;
+    }
+  };
+
+  public int wordCount(DataStore<String,WebPage> inStore,
+    DataStore<String, TokenDatum> outStore) throws IOException {
+
+    //Spark engine initialization
+    GoraSparkEngine<String, WebPage> goraSparkEngine = new GoraSparkEngine<>(String.class,
+       WebPage.class);
+
+    SparkConf sparkConf = new SparkConf().setAppName(
+      "Gora Spark Word Count Application").setMaster("local");
+
+    Class[] c = new Class[1];
+    c[0] = inStore.getPersistentClass();
+    sparkConf.registerKryoClasses(c);
+    //
+    JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+    JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
+
+    long count = goraRDD.count();
+    System.out.println("Total Log Count: " + count);
+
+    JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
+
+    JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
+
+    //Print output for debug purpose
+    System.out.println("SparkWordCount debug purpose TokenDatum print starts:");
+    Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
+    for (String key : tokenDatumMap.keySet()) {
+      System.out.println(key);
+      System.out.println(tokenDatumMap.get(key));
+    }
+    System.out.println("SparkWordCount debug purpose TokenDatum print ends:");
+    //
+
+    //write output to datastore
+    Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore);
+    reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
+    //
+
+    return 1;
+  }
+
+  public int run(String[] args) throws Exception {
+
+    DataStore<String,WebPage> inStore;
+    DataStore<String, TokenDatum> outStore;
+    Configuration hadoopConf = new Configuration();
+    if(args.length > 0) {
+      String dataStoreClass = args[0];
+      inStore = DataStoreFactory.getDataStore(dataStoreClass, String.class, WebPage.class, hadoopConf);
+      if(args.length > 1) {
+        dataStoreClass = args[1];
+      }
+      outStore = DataStoreFactory.getDataStore(dataStoreClass,
+        String.class, TokenDatum.class, hadoopConf);
+      } else {
+        inStore = DataStoreFactory.getDataStore(String.class, WebPage.class, hadoopConf);
+        outStore = DataStoreFactory.getDataStore(String.class, TokenDatum.class, hadoopConf);
+      }
+
+      return wordCount(inStore, outStore);
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    if (args.length < 2) {
+      System.err.println(USAGE);
+      System.exit(1);
+    }
+
+    SparkWordCount sparkWordCount = new SparkWordCount();
+
+    try {
+      int ret = sparkWordCount.run(args);
+      System.exit(ret);
+    } catch (Exception ex){
+      log.error("Error occurred!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
----------------------------------------------------------------------
diff --git a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
index 12ccea4..c04f615 100644
--- a/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
+++ b/gora-core/src/test/java/org/apache/gora/mapreduce/MapReduceTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.gora.examples.generated.TokenDatum;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.examples.mapreduce.QueryCounter;
 import org.apache.gora.examples.mapreduce.WordCount;
+import org.apache.gora.examples.spark.SparkWordCount;
 import org.apache.gora.query.Query;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.impl.DataStoreBase;
@@ -103,6 +104,36 @@ public class MapReduceTestUtils {
       assertTokenCount(outStore, entry.getKey(), entry.getValue()); 
     }
   }
+
+  public static void testSparkWordCount(Configuration conf, DataStore<String,WebPage> inStore, DataStore<String,
+      TokenDatum> outStore) throws Exception {
+    //Datastore now has to be a Hadoop based datastore
+    ((DataStoreBase<String,WebPage>)inStore).setConf(conf);
+    ((DataStoreBase<String,TokenDatum>)outStore).setConf(conf);
+
+    //create input
+    WebPageDataCreator.createWebPageData(inStore);
+
+    //run Spark
+    SparkWordCount wordCount = new SparkWordCount();
+    wordCount.wordCount(inStore, outStore);
+
+    //assert results
+    HashMap<String, Integer> actualCounts = new HashMap<String, Integer>();
+    for(String content : WebPageDataCreator.CONTENTS) {
+      if (content != null) {
+        for(String token:content.split(" ")) {
+          Integer count = actualCounts.get(token);
+          if(count == null)
+            count = 0;
+          actualCounts.put(token, ++count);
+        }
+      }
+    }
+    for(Map.Entry<String, Integer> entry:actualCounts.entrySet()) {
+      assertTokenCount(outStore, entry.getKey(), entry.getValue());
+    }
+  }
   
   private static void assertTokenCount(DataStore<String, TokenDatum> outStore,
       String token, int count) throws Exception {

http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
----------------------------------------------------------------------
diff --git a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
index b42b0c0..cb4ef03 100644
--- a/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
+++ b/gora-hbase/src/test/java/org/apache/gora/hbase/mapreduce/TestHBaseStoreWordCount.java
@@ -27,6 +27,7 @@ import org.apache.gora.store.DataStoreFactory;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -58,4 +59,11 @@ public class TestHBaseStoreWordCount {
     MapReduceTestUtils.testWordCount(cluster.getConf(), webPageStore, tokenStore);
   }
 
+  //todo fix config
+  @Ignore
+  @Test
+  public void testSparkWordCount() throws Exception {
+    MapReduceTestUtils.testSparkWordCount(cluster.getConf(), webPageStore, tokenStore);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/e817a7f0/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
----------------------------------------------------------------------
diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
index d6a51a4..dd76263 100644
--- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
+++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/mapreduce/TestMongoStoreWordCount.java
@@ -24,6 +24,7 @@ import org.apache.gora.mongodb.store.MongoStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -55,4 +56,12 @@ public class TestMongoStoreWordCount extends GoraMongoMapredTest {
         webPageStore, tokenStore);
   }
 
+  //todo fix config
+  @Ignore
+  @Test
+  public void testSparkWordCount() throws Exception {
+    MapReduceTestUtils.testSparkWordCount(testDriver.getConfiguration(),
+        webPageStore, tokenStore);
+  }
+
 }


[08/25] gora git commit: * Writing to Hbase via Spark is implemented.

Posted by le...@apache.org.
* Writing to Hbase via Spark is implemented.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/c111e629
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/c111e629
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/c111e629

Branch: refs/heads/master
Commit: c111e6290fa16d2cc560eb29dbc07eb2f8b7734b
Parents: 80c0c26
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Wed Jul 15 20:23:51 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Wed Jul 15 20:23:51 2015 +0300

----------------------------------------------------------------------
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 35 ++++++++++++++++++--
 1 file changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/c111e629/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index abced3f..828939e 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -19,12 +19,16 @@ package org.apache.gora.tutorial.log;
 
 import java.util.Map;
 
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.gora.persistency.Persistent;
 import org.apache.gora.spark.GoraSparkEngine;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.tutorial.log.generated.MetricDatum;
 import org.apache.gora.tutorial.log.generated.Pageview;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
@@ -135,6 +139,8 @@ public class LogAnalyticsSpark {
         "Gora Integration Application").setMaster("local");
 
     // todo _fk consider alternative architectural design
+    // todo design inStore and outStore initialization parts as like LogAnalytics.java
+    // todo consider creating job and manipulating it at input part as like LogAnalytics.java
     Class[] c = new Class[1];
     c[0] = Pageview.class;
     sparkConf.registerKryoClasses(c);
@@ -143,10 +149,10 @@ public class LogAnalyticsSpark {
 
     Configuration hadoopConf = new Configuration();
 
-    DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
+    DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, dataStore);
+    JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
 
     long count = goraRDD.count();
     System.out.println("Total Log Count: " + count);
@@ -162,10 +168,35 @@ public class LogAnalyticsSpark {
 
     System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
+    //print screen output
     Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
     for (String key : metricDatumMap.keySet()) {
       System.out.println(key);
     }
+    //
+
+    //write output to datastore
+    DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore(
+            outStoreClass, String.class, MetricDatum.class, hadoopConf);
+
+    GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+
+    Job job = Job.getInstance(hadoopConf);
+    job.setOutputFormatClass(GoraOutputFormat.class);
+    job.setOutputKeyClass(outStore.getKeyClass());
+    job.setOutputValueClass(outStore.getPersistentClass());
+
+    job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, outStore.getClass(),
+              DataStore.class);
+    job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, outStore.getKeyClass(), Object.class);
+    job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
+            outStore.getPersistentClass(), Persistent.class);
+
+    reducedGoraRdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
+    //
+
+    inStore.close();
+    outStore.close();
 
     return 1;
   }


[05/25] gora git commit: * map was not collecting the counts and it is fixed.

Posted by le...@apache.org.
* map was not collecting the counts and it is fixed.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/445edb12
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/445edb12
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/445edb12

Branch: refs/heads/master
Commit: 445edb12c35f867e7b81919ddfb509dd8c179ba3
Parents: 5644a21
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 02:26:41 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 02:26:41 2015 +0300

----------------------------------------------------------------------
 .../apache/gora/tutorial/log/LogAnalyticsSpark.java   | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/445edb12/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 43acba0..0ad3e57 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -40,12 +40,14 @@ public class LogAnalyticsSpark {
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
   //todo _fk consider using Kyro serialization
-  private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() {
+  private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () {
     @Override
-    public Tuple2<String, Long> call(Pageview pageview) throws Exception {
-      String key = pageview.getUrl().toString();
-      Long value = getDay(pageview.getTimestamp());
-      return new Tuple2<>(key, value);
+    public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception {
+      String url = pageview.getUrl().toString();
+      Long day = getDay(pageview.getTimestamp());
+      Tuple2<String, Long> keyTuple =new Tuple2<>(url, day);
+
+      return new Tuple2<>(keyTuple, 1L);
     }
   };
 
@@ -100,7 +102,7 @@ public class LogAnalyticsSpark {
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
 
-    JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s);
+    JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s);
 
     return 1;
   }


[09/25] gora git commit: * Added dependency to write output to Solr.

Posted by le...@apache.org.
* Added dependency to write output to Solr.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/cf6e7658
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/cf6e7658
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/cf6e7658

Branch: refs/heads/master
Commit: cf6e76585b18ff926f8e7e4928f2d7cd28ac7c6f
Parents: c111e62
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Thu Jul 16 19:38:37 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Thu Jul 16 19:38:37 2015 +0300

----------------------------------------------------------------------
 gora-tutorial/pom.xml                                          | 5 +++++
 .../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java   | 6 +++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/cf6e7658/gora-tutorial/pom.xml
----------------------------------------------------------------------
diff --git a/gora-tutorial/pom.xml b/gora-tutorial/pom.xml
index 1a13083..5a775b3 100644
--- a/gora-tutorial/pom.xml
+++ b/gora-tutorial/pom.xml
@@ -108,6 +108,11 @@
       <artifactId>gora-cassandra</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.gora</groupId>
+      <artifactId>gora-solr</artifactId>
+    </dependency>
+
     <!--dependency> <groupId>org.apache.gora</groupId> <artifactId>gora-sql</artifactId> 
       </dependency> <dependency> <groupId>org.apache.gora</groupId> <artifactId>gora-solr</artifactId> 
       </dependency -->

http://git-wip-us.apache.org/repos/asf/gora/blob/cf6e7658/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 828939e..372c124 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,8 +17,6 @@
  */
 package org.apache.gora.tutorial.log;
 
-import java.util.Map;
-
 import org.apache.gora.mapreduce.GoraMapReduceUtils;
 import org.apache.gora.mapreduce.GoraOutputFormat;
 import org.apache.gora.persistency.Persistent;
@@ -158,7 +156,7 @@ public class LogAnalyticsSpark {
     System.out.println("Total Log Count: " + count);
 
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
-    System.out.println(firstOneURL);
+    System.out.println("First entry's first URL:" + firstOneURL);
 
     JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
         .values().map(mapFunc);
@@ -169,10 +167,12 @@ public class LogAnalyticsSpark {
     System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
     //print screen output
+    /*
     Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
     for (String key : metricDatumMap.keySet()) {
       System.out.println(key);
     }
+    */
     //
 
     //write output to datastore


[25/25] gora git commit: GORA-386 Gora Spark Backend Support - Final update of LogAnalyticsSpark to comply with forbidden-api plugin

Posted by le...@apache.org.
GORA-386 Gora Spark Backend Support - Final update of LogAnalyticsSpark to comply with forbidden-api plugin


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ea44388f
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ea44388f
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ea44388f

Branch: refs/heads/master
Commit: ea44388f9883218f7314ed4bec51e3f2115cf33f
Parents: 7c6a63e
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Thu Sep 3 00:29:30 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Thu Sep 3 00:29:30 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                                    | 2 ++
 .../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java   | 6 +++---
 2 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/ea44388f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 86d78b2..3f7e94d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@
 
 Current Development
 
+* GORA-386 Gora Spark Backend Support (Furkan KAMACI, talat, lewismc)
+
 * GORA-429 Implement Maven forbidden-apis plugin in Gora (lewismc, rmarroquin)
 
 * GORA-228 java.util.ConcurrentModificationException when using MemStore for concurrent tests (Yasin Kılınç, cihad güzel, lewismc)

http://git-wip-us.apache.org/repos/asf/gora/blob/ea44388f/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 84bb693..cdd245c 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -107,7 +107,7 @@ public class LogAnalyticsSpark {
 
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println(USAGE);
+      log.error(USAGE);
       System.exit(1);
     }
 
@@ -166,7 +166,7 @@ public class LogAnalyticsSpark {
     JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
 
     long count = goraRDD.count();
-    System.out.println("Total Log Count: " + count);
+    log.info("Total Log Count: {}", count);
 
     JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
         .values().map(mapFunc);
@@ -174,7 +174,7 @@ public class LogAnalyticsSpark {
     JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD
         .fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
 
-    System.out.println("MetricDatum count:" + reducedGoraRdd.count());
+    log.info("MetricDatum count: {}", reducedGoraRdd.count());
 
     //Print output for debug purpose
     /*


[18/25] gora git commit: Spark version should be placed at parent pom and it's done.

Posted by le...@apache.org.
Spark version should be placed at parent pom and it's done.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/888c899f
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/888c899f
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/888c899f

Branch: refs/heads/master
Commit: 888c899f966c27dec1fb173611812f8574737da1
Parents: 8f1acc6
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Thu Aug 27 23:46:34 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Thu Aug 27 23:46:34 2015 +0300

----------------------------------------------------------------------
 gora-core/pom.xml | 2 +-
 pom.xml           | 1 +
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/888c899f/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index 5f147fb..55312d1 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -145,7 +145,7 @@
     <dependency> <!-- Spark dependency -->
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
-      <version>1.3.1</version>
+      <version>${spark.version}</version>
     </dependency>
 
     <!-- Logging Dependencies -->

http://git-wip-us.apache.org/repos/asf/gora/blob/888c899f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a6261ca..307fa1d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -644,6 +644,7 @@
     <commons-io.version>1.3.2</commons-io.version>
     <restlet.version>2.3.1</restlet.version>
 
+    <spark.version>1.3.1</spark.version>
     <!-- Misc Dependencies -->
     <guava.version>13.0</guava.version>
     <commons-lang.version>2.6</commons-lang.version>


[14/25] gora git commit: Unnecessary imports are removed from LogAnalyticsSpark.java

Posted by le...@apache.org.
Unnecessary imports are removed from LogAnalyticsSpark.java


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8893fd5c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8893fd5c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8893fd5c

Branch: refs/heads/master
Commit: 8893fd5c6c53b19925c99668d24df23c52e70b1e
Parents: 92b71a6
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 21 15:58:21 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 21 15:58:21 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java     | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/8893fd5c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 8c37f8c..59e7c79 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,16 +17,12 @@
  */
 package org.apache.gora.tutorial.log;
 
-import org.apache.gora.mapreduce.GoraMapReduceUtils;
-import org.apache.gora.mapreduce.GoraOutputFormat;
-import org.apache.gora.persistency.Persistent;
 import org.apache.gora.spark.GoraSparkEngine;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.tutorial.log.generated.MetricDatum;
 import org.apache.gora.tutorial.log.generated.Pageview;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;


[23/25] gora git commit: GORA-386 Gora Spark Backend Support addition to make forbidden api compliant

Posted by le...@apache.org.
GORA-386 Gora Spark Backend Support addition to make forbidden api compliant


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/2ce2fda5
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/2ce2fda5
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/2ce2fda5

Branch: refs/heads/master
Commit: 2ce2fda5b7f688f1c5fa9e10f7c8ff64ce99be12
Parents: 965c449
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Tue Sep 1 15:26:59 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Tue Sep 1 15:26:59 2015 -0700

----------------------------------------------------------------------
 .../apache/gora/examples/spark/SparkWordCount.java  | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/2ce2fda5/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
index 0a84d58..4f91fb2 100644
--- a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
+++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
@@ -17,6 +17,8 @@
  */
 package org.apache.gora.examples.spark;
 
+import java.nio.charset.Charset;
+
 import org.apache.gora.examples.generated.TokenDatum;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.spark.GoraSparkEngine;
@@ -52,7 +54,7 @@ public class SparkWordCount {
         @Override
         public Tuple2<String, Long> call(WebPage webPage)
                 throws Exception {
-          String content = new String(webPage.getContent().array());
+          String content = new String(webPage.getContent().array(), Charset.defaultCharset());
           return new Tuple2<>(content, 1L);
         }
   };
@@ -86,20 +88,20 @@ public class SparkWordCount {
     JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
 
     long count = goraRDD.count();
-    System.out.println("Total Web page count: " + count);
+    log.info("Total Web page count: {}", count);
 
     JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
 
     JavaPairRDD<String, Long> reducedGoraRdd = JavaPairRDD.fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc);
 
     //Print output for debug purpose
-    System.out.println("SparkWordCount debug purpose TokenDatum print starts:");
+    log.info("SparkWordCount debug purpose TokenDatum print starts:");
     Map<String, Long> tokenDatumMap = reducedGoraRdd.collectAsMap();
     for (String key : tokenDatumMap.keySet()) {
-      System.out.println(key);
-      System.out.println(tokenDatumMap.get(key));
+      log.info(key);
+      log.info(tokenDatumMap.get(key).toString());
     }
-    System.out.println("SparkWordCount debug purpose TokenDatum print ends:");
+    log.info("SparkWordCount debug purpose TokenDatum print ends:");
     //
 
     //write output to datastore
@@ -134,7 +136,7 @@ public class SparkWordCount {
   public static void main(String[] args) throws Exception {
 
     if (args.length < 2) {
-      System.err.println(USAGE);
+      log.info(USAGE);
       System.exit(1);
     }
 


[17/25] gora git commit: Minor change at SparkWordCount.java

Posted by le...@apache.org.
Minor change at SparkWordCount.java


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8f1acc6d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8f1acc6d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8f1acc6d

Branch: refs/heads/master
Commit: 8f1acc6d4ef6c192e8fc06287558b7bc7c39b040
Parents: e817a7f
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sat Aug 22 22:06:42 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sat Aug 22 22:06:42 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/examples/spark/SparkWordCount.java        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/8f1acc6d/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
index 837da13..0a84d58 100644
--- a/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
+++ b/gora-core/src/examples/java/org/apache/gora/examples/spark/SparkWordCount.java
@@ -86,7 +86,7 @@ public class SparkWordCount {
     JavaPairRDD<String, WebPage> goraRDD = goraSparkEngine.initialize(sc, inStore);
 
     long count = goraRDD.count();
-    System.out.println("Total Log Count: " + count);
+    System.out.println("Total Web page count: " + count);
 
     JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(mapFunc);
 


[07/25] gora git commit: Organizing codes Documentation

Posted by le...@apache.org.
Organizing codes
Documentation


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/80c0c26d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/80c0c26d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/80c0c26d

Branch: refs/heads/master
Commit: 80c0c26d8fb6a9a84ea39f5aa96cc343b4546266
Parents: 81af4d3
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 19:26:28 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 19:26:28 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSpark.java   | 81 -----------------
 .../org/apache/gora/spark/GoraSparkEngine.java  | 96 ++++++++++++++++++++
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 40 +++++++-
 3 files changed, 131 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
deleted file mode 100644
index 02b5b39..0000000
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gora.spark;
-
-import java.io.IOException;
-
-import org.apache.gora.mapreduce.GoraInputFormat;
-import org.apache.gora.mapreduce.GoraMapReduceUtils;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.store.DataStore;
-import org.apache.gora.util.IOUtils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-/**
- * Base class for Spark integration
- */
-public class GoraSpark<K, V extends Persistent> {
-  Class<K> clazzK;
-  Class<V> clazzV;
-
-  public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
-    this.clazzK = clazzK;
-    this.clazzV = clazzV;
-  }
-
-  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
-      Configuration conf, DataStore<K, V> dataStore) {
-    GoraMapReduceUtils.setIOSerializations(conf, true);
-
-    try {
-      IOUtils
-          .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex.getMessage());
-    }
-
-    return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
-        clazzV);
-  }
-
-  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
-      DataStore<K, V> dataStore) {
-    Configuration hadoopConf;
-
-      if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
-          hadoopConf = ((Configurable) dataStore).getConf();
-      } else {
-          hadoopConf = new Configuration();
-      }
-
-      GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-
-    try {
-      IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
-              GoraInputFormat.QUERY_KEY);
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex.getMessage());
-    }
-
-    return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
-        clazzK, clazzV);
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
new file mode 100644
index 0000000..ced44be
--- /dev/null
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.spark;
+
+import java.io.IOException;
+
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Base class for Gora - Spark integration.
+ */
+public class GoraSparkEngine<K, V extends Persistent> {
+  Class<K> clazzK;
+  Class<V> clazzV;
+
+  public GoraSparkEngine(Class<K> clazzK, Class<V> clazzV) {
+    this.clazzK = clazzK;
+    this.clazzV = clazzV;
+  }
+
+  /**
+   * Initializes a {@link JavaPairRDD} from given Spark context, Hadoop
+   * configuration and data store.
+   * 
+   * @param sparkContext
+   *          Spark context
+   * @param conf
+   *          Hadoop configuration
+   * @param dataStore
+   *          Data store
+   * @return initialized rdd
+   */
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+      Configuration conf, DataStore<K, V> dataStore) {
+    GoraMapReduceUtils.setIOSerializations(conf, true);
+
+    try {
+      IOUtils
+          .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex.getMessage());
+    }
+
+    return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+        clazzV);
+  }
+
+  /**
+   * Initializes a {@link JavaPairRDD} from given Spark context and data store.
+   * If given data store is {@link Configurable} and has not a configuration
+   * than a Hadoop configuration is created otherwise existed configuration is
+   * used.
+   * 
+   * @param sparkContext
+   *          Spark context
+   * @param dataStore
+   *          Data store
+   * @return initialized rdd
+   */
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+      DataStore<K, V> dataStore) {
+    Configuration hadoopConf;
+
+    if ((dataStore instanceof Configurable)
+        && ((Configurable) dataStore).getConf() != null) {
+      hadoopConf = ((Configurable) dataStore).getConf();
+    } else {
+      hadoopConf = new Configuration();
+    }
+
+    return initialize(sparkContext, hadoopConf, dataStore);
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index d69649e..abced3f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,7 +17,9 @@
  */
 package org.apache.gora.tutorial.log;
 
-import org.apache.gora.spark.GoraSpark;
+import java.util.Map;
+
+import org.apache.gora.spark.GoraSparkEngine;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.tutorial.log.generated.MetricDatum;
@@ -33,6 +35,21 @@ import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
+/**
+ * LogAnalyticsSpark is the tutorial class to illustrate Gora Spark API. The
+ * Spark job reads the web access data stored earlier by the {@link LogManager},
+ * and calculates the aggregate daily pageviews. The output of the job is stored
+ * in a Gora compatible data store.
+ *
+ * This class illustrates the same functionality with {@link LogAnalytics} via
+ * Spark.
+ *
+ * <p>
+ * See the tutorial.html file in docs or go to the <a
+ * href="http://incubator.apache.org/gora/docs/current/tutorial.html"> web
+ * site</a>for more information.
+ * </p>
+ */
 public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -41,6 +58,9 @@ public class LogAnalyticsSpark {
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
   // todo _fk consider using Kyro serialization
+  /**
+   * map function used in calculation
+   */
   private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
     @Override
     public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
@@ -53,6 +73,9 @@ public class LogAnalyticsSpark {
     }
   };
 
+    /**
+     * reduce function used in calculation
+     */
   private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
     @Override
     public Long call(Long aLong, Long aLong2) throws Exception {
@@ -60,6 +83,9 @@ public class LogAnalyticsSpark {
     }
   };
 
+    /**
+     * metric function used after map phase
+     */
   private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
     @Override
     public Tuple2<String, MetricDatum> call(
@@ -102,7 +128,7 @@ public class LogAnalyticsSpark {
   }
 
   public int run(String inStoreClass, String outStoreClass) throws Exception {
-    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+    GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
@@ -120,11 +146,10 @@ public class LogAnalyticsSpark {
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
-        dataStore);
+    JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, dataStore);
 
     long count = goraRDD.count();
-    System.out.println("Total Count: " + count);
+    System.out.println("Total Log Count: " + count);
 
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
@@ -137,6 +162,11 @@ public class LogAnalyticsSpark {
 
     System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
+    Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
+    for (String key : metricDatumMap.keySet()) {
+      System.out.println(key);
+    }
+
     return 1;
   }
 }


[19/25] gora git commit: servlet dependency is excluded from spark, not ant. Without excluding it throws a java security exception. Motivation for it: https://issues.apache.org/jira/browse/SPARK-1693 and

Posted by le...@apache.org.
servlet dependency is excluded from spark, not ant. Without excluding it throws a java security exception. Motivation for it: https://issues.apache.org/jira/browse/SPARK-1693 and


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ad4f5601
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ad4f5601
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ad4f5601

Branch: refs/heads/master
Commit: ad4f5601ebd52e182bf79ba4032dcb85d918b925
Parents: 888c899
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 28 00:15:42 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 28 00:15:42 2015 +0300

----------------------------------------------------------------------
 gora-core/pom.xml | 6 ++++++
 pom.xml           | 4 ----
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/ad4f5601/gora-core/pom.xml
----------------------------------------------------------------------
diff --git a/gora-core/pom.xml b/gora-core/pom.xml
index 55312d1..16fe808 100644
--- a/gora-core/pom.xml
+++ b/gora-core/pom.xml
@@ -146,6 +146,12 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.10</artifactId>
       <version>${spark.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.eclipse.jetty.orbit</groupId>
+          <artifactId>javax.servlet</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Logging Dependencies -->

http://git-wip-us.apache.org/repos/asf/gora/blob/ad4f5601/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 307fa1d..cb55db4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -894,10 +894,6 @@
             <groupId>ant</groupId>
             <artifactId>ant</artifactId>
           </exclusion>
-          <exclusion>
-            <groupId>org.mortbay.jetty</groupId>
-            <artifactId>servlet-api</artifactId>
-          </exclusion>
         </exclusions>
       </dependency>
 


[12/25] gora git commit: GoraSparkEngine.java setOutput method renamed to generateOutputConf. Added a new method to GoraSparkEngine.java which creates a job and returns necessary conf for Spark to use.

Posted by le...@apache.org.
GoraSparkEngine.java setOutput method renamed to generateOutputConf.
Added a new method to GoraSparkEngine.java which creates a job and returns necessary conf for Spark to use.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/85849111
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/85849111
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/85849111

Branch: refs/heads/master
Commit: 8584911176712f63e316707ace23a12060588cdc
Parents: 62be0c3
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 23:25:15 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 23:25:15 2015 +0300

----------------------------------------------------------------------
 .../org/apache/gora/spark/GoraSparkEngine.java  | 28 +++++++++++++++-----
 .../gora/tutorial/log/LogAnalyticsSpark.java    |  5 +---
 2 files changed, 23 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/85849111/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index 9c64542..7a819fd 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -97,19 +97,35 @@ public class GoraSparkEngine<K, V extends Persistent> {
   }
 
     /**
-     * Sets the output parameters for the job
+     * Creates a job and sets the output parameters for the conf that Spark will use
+     * @param dataStore the datastore as the output
+     * @param reuseObjects whether to reuse objects in serialization
+     */
+    public <K, V extends Persistent> Configuration generateOutputConf(DataStore<K, V> dataStore,
+        boolean reuseObjects) throws IOException {
+
+      Configuration hadoopConf = new Configuration();
+      GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+      Job job = Job.getInstance(hadoopConf);
+
+      return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
+           dataStore.getPersistentClass(), reuseObjects);
+    }
+
+    /**
+     * Sets the output parameters for the conf that Spark will use
      * @param job the job to set the properties for
      * @param dataStore the datastore as the output
      * @param reuseObjects whether to reuse objects in serialization
      */
-    public <K, V extends Persistent> Configuration setOutput(Job job,
+    public <K, V extends Persistent> Configuration generateOutputConf(Job job,
         DataStore<K, V> dataStore, boolean reuseObjects) {
-      return setOutput(job, dataStore.getClass(), dataStore.getKeyClass(),
-           dataStore.getPersistentClass(), reuseObjects);
+      return generateOutputConf(job, dataStore.getClass(), dataStore.getKeyClass(),
+              dataStore.getPersistentClass(), reuseObjects);
     }
 
     /**
-     * Sets the output parameters for the job
+     * Sets the output parameters for the conf that Spark will use
      *
      * @param job             the job to set the properties for
      * @param dataStoreClass  the datastore class
@@ -118,7 +134,7 @@ public class GoraSparkEngine<K, V extends Persistent> {
      * @param reuseObjects    whether to reuse objects in serialization
      */
     @SuppressWarnings("rawtypes")
-    public <K, V extends Persistent> Configuration setOutput(Job job,
+    public <K, V extends Persistent> Configuration generateOutputConf(Job job,
         Class<? extends DataStore> dataStoreClass,
         Class<K> keyClass, Class<V> persistentClass,
         boolean reuseObjects) {

http://git-wip-us.apache.org/repos/asf/gora/blob/85849111/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index d441de7..c66f7e4 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -190,10 +190,7 @@ public class LogAnalyticsSpark {
     //
 
     //write output to datastore
-    GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-    Job job = Job.getInstance(hadoopConf);
-
-    Configuration sparkHadoopConf = goraSparkEngine.setOutput(job, outStore, true);
+    Configuration sparkHadoopConf = goraSparkEngine.generateOutputConf(outStore, true);
     reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
     //
 


[11/25] gora git commit: * GoraSparkEngine.java architecture is improved.

Posted by le...@apache.org.
* GoraSparkEngine.java architecture is improved.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/62be0c31
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/62be0c31
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/62be0c31

Branch: refs/heads/master
Commit: 62be0c312927e3ea4962eb966689b49dc1fcebce
Parents: 2195570
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 22:25:12 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 22:25:12 2015 +0300

----------------------------------------------------------------------
 .../org/apache/gora/spark/GoraSparkEngine.java  | 41 ++++++++++++++++++++
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 19 ++-------
 2 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
index ced44be..9c64542 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -21,11 +21,13 @@ import java.io.IOException;
 
 import org.apache.gora.mapreduce.GoraInputFormat;
 import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.mapreduce.GoraOutputFormat;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.util.IOUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -93,4 +95,43 @@ public class GoraSparkEngine<K, V extends Persistent> {
 
     return initialize(sparkContext, hadoopConf, dataStore);
   }
+
+    /**
+     * Sets the output parameters for the job
+     * @param job the job to set the properties for
+     * @param dataStore the datastore as the output
+     * @param reuseObjects whether to reuse objects in serialization
+     */
+    public <K, V extends Persistent> Configuration setOutput(Job job,
+        DataStore<K, V> dataStore, boolean reuseObjects) {
+      return setOutput(job, dataStore.getClass(), dataStore.getKeyClass(),
+           dataStore.getPersistentClass(), reuseObjects);
+    }
+
+    /**
+     * Sets the output parameters for the job
+     *
+     * @param job             the job to set the properties for
+     * @param dataStoreClass  the datastore class
+     * @param keyClass        output key class
+     * @param persistentClass output value class
+     * @param reuseObjects    whether to reuse objects in serialization
+     */
+    @SuppressWarnings("rawtypes")
+    public <K, V extends Persistent> Configuration setOutput(Job job,
+        Class<? extends DataStore> dataStoreClass,
+        Class<K> keyClass, Class<V> persistentClass,
+        boolean reuseObjects) {
+
+      job.setOutputFormatClass(GoraOutputFormat.class);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(persistentClass);
+
+      job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, dataStoreClass,
+              DataStore.class);
+      job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, keyClass, Object.class);
+      job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
+              persistentClass, Persistent.class);
+      return job.getConfiguration();
+    }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/62be0c31/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 327bf7f..d441de7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -63,7 +63,6 @@ public class LogAnalyticsSpark {
   /** The number of milliseconds in a day */
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
-  // todo _fk consider using Kyro serialization
   /**
    * map function used in calculation
    */
@@ -173,9 +172,6 @@ public class LogAnalyticsSpark {
     long count = goraRDD.count();
     System.out.println("Total Log Count: " + count);
 
-    String firstOneURL = goraRDD.first()._2().getUrl().toString();
-    System.out.println("First entry's first URL:" + firstOneURL);
-
     JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
         .values().map(mapFunc);
 
@@ -184,7 +180,7 @@ public class LogAnalyticsSpark {
 
     System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
-    //print screen output
+    //Print output for debug purpose
     /*
     Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
     for (String key : metricDatumMap.keySet()) {
@@ -195,19 +191,10 @@ public class LogAnalyticsSpark {
 
     //write output to datastore
     GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-
     Job job = Job.getInstance(hadoopConf);
-    job.setOutputFormatClass(GoraOutputFormat.class);
-    job.setOutputKeyClass(outStore.getKeyClass());
-    job.setOutputValueClass(outStore.getPersistentClass());
-
-    job.getConfiguration().setClass(GoraOutputFormat.DATA_STORE_CLASS, outStore.getClass(),
-              DataStore.class);
-    job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_KEY_CLASS, outStore.getKeyClass(), Object.class);
-    job.getConfiguration().setClass(GoraOutputFormat.OUTPUT_VALUE_CLASS,
-            outStore.getPersistentClass(), Persistent.class);
 
-    reducedGoraRdd.saveAsNewAPIHadoopDataset(job.getConfiguration());
+    Configuration sparkHadoopConf = goraSparkEngine.setOutput(job, outStore, true);
+    reducedGoraRdd.saveAsNewAPIHadoopDataset(sparkHadoopConf);
     //
 
     inStore.close();


[15/25] gora git commit: Minor improvements for LogAnalyticsSpark.java

Posted by le...@apache.org.
Minor improvements for LogAnalyticsSpark.java


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/9cff3359
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/9cff3359
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/9cff3359

Branch: refs/heads/master
Commit: 9cff33598231e9cf54060cecf1ccacdf62e64c5b
Parents: 8893fd5
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sat Aug 22 16:46:55 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sat Aug 22 16:46:55 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/9cff3359/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 59e7c79..84bb693 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -155,10 +155,10 @@ public class LogAnalyticsSpark {
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
-        "Gora Integration Application").setMaster("local");
+        "Gora Spark Integration Application").setMaster("local");
 
     Class[] c = new Class[1];
-    c[0] = Pageview.class;
+    c[0] = inStore.getPersistentClass();
     sparkConf.registerKryoClasses(c);
     //
     JavaSparkContext sc = new JavaSparkContext(sparkConf);


[22/25] gora git commit: Merge branch 'master' of https://github.com/kamaci/gora into GORA-386

Posted by le...@apache.org.
Merge branch 'master' of https://github.com/kamaci/gora into GORA-386


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/965c4499
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/965c4499
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/965c4499

Branch: refs/heads/master
Commit: 965c44999d0afde2b6379b035c568d8539b0c384
Parents: a303ff7 3cf5502
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Tue Sep 1 13:49:50 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Tue Sep 1 13:49:50 2015 -0700

----------------------------------------------------------------------
 bin/gora                                        |   5 +
 gora-core/pom.xml                               |  13 ++
 .../gora/examples/spark/SparkWordCount.java     | 150 ++++++++++++++
 .../org/apache/gora/spark/GoraSparkEngine.java  | 150 ++++++++++++++
 .../gora/mapreduce/MapReduceTestUtils.java      |  31 +++
 .../mapreduce/TestHBaseStoreWordCount.java      |   8 +
 .../mapreduce/TestMongoStoreWordCount.java      |   9 +
 gora-tutorial/pom.xml                           |   5 +
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 200 +++++++++++++++++++
 pom.xml                                         |   1 +
 10 files changed, 572 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/965c4499/gora-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/gora/blob/965c4499/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index e8ed87d,2b5821c..8197371
--- a/pom.xml
+++ b/pom.xml
@@@ -669,9 -641,10 +669,10 @@@
      <jetty.version>8.1.8.v20121106</jetty.version>
      <tika.version>1.7</tika.version>
      <httpcomponents.version>4.3.1</httpcomponents.version>
 -    <commons-io.version>1.3.2</commons-io.version>
 +    <commons-io.version>2.4</commons-io.version>
      <restlet.version>2.3.1</restlet.version>
  
+     <spark.version>1.4.1</spark.version>
      <!-- Misc Dependencies -->
      <guava.version>13.0</guava.version>
      <commons-lang.version>2.6</commons-lang.version>


[10/25] gora git commit: Code is organized at LogAnalyticsSpark.java.

Posted by le...@apache.org.
Code is organized at LogAnalyticsSpark.java.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/21955700
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/21955700
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/21955700

Branch: refs/heads/master
Commit: 219557002f0e33ef6d0e2bb49471d24fb867b0ac
Parents: cf6e765
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Aug 17 21:21:43 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Aug 17 21:21:43 2015 +0300

----------------------------------------------------------------------
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 51 +++++++++++++-------
 1 file changed, 34 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/21955700/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 372c124..327bf7f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -35,6 +35,8 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 /**
@@ -54,6 +56,8 @@ import scala.Tuple2;
  */
 public class LogAnalyticsSpark {
 
+  private static final Logger log = LoggerFactory.getLogger(LogAnalyticsSpark.class);
+
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
 
   /** The number of milliseconds in a day */
@@ -112,13 +116,15 @@ public class LogAnalyticsSpark {
       System.exit(1);
     }
 
-    String inStoreClass = args[0];
-    String outStoreClass = args[1];
-
     LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
-    int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
 
-    System.exit(ret);
+    try {
+      int ret = logAnalyticsSpark.run(args);
+      System.exit(ret);
+    } catch (Exception ex){
+      log.error("Error occurred!");
+    }
+
   }
 
   /**
@@ -129,27 +135,39 @@ public class LogAnalyticsSpark {
     return (timeStamp / DAY_MILIS) * DAY_MILIS;
   }
 
-  public int run(String inStoreClass, String outStoreClass) throws Exception {
+  public int run(String[] args) throws Exception {
+
+    DataStore<Long, Pageview> inStore;
+    DataStore<String, MetricDatum> outStore;
+    Configuration hadoopConf = new Configuration();
+
+    if (args.length > 0) {
+      String dataStoreClass = args[0];
+      inStore = DataStoreFactory.getDataStore(
+      dataStoreClass, Long.class, Pageview.class, hadoopConf);
+      if (args.length > 1) {
+        dataStoreClass = args[1];
+      }
+        outStore = DataStoreFactory.getDataStore(
+        dataStoreClass, String.class, MetricDatum.class, hadoopConf);
+      } else {
+        inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, hadoopConf);
+        outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, hadoopConf);
+    }
+
+    //Spark engine initialization
     GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
 
-    // todo _fk consider alternative architectural design
-    // todo design inStore and outStore initialization parts as like LogAnalytics.java
-    // todo consider creating job and manipulating it at input part as like LogAnalytics.java
     Class[] c = new Class[1];
     c[0] = Pageview.class;
     sparkConf.registerKryoClasses(c);
     //
     JavaSparkContext sc = new JavaSparkContext(sparkConf);
 
-    Configuration hadoopConf = new Configuration();
-
-    DataStore<Long, Pageview> inStore = DataStoreFactory.getDataStore(
-        inStoreClass, Long.class, Pageview.class, hadoopConf);
-
     JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, inStore);
 
     long count = goraRDD.count();
@@ -176,9 +194,6 @@ public class LogAnalyticsSpark {
     //
 
     //write output to datastore
-    DataStore<String, MetricDatum> outStore = DataStoreFactory.getDataStore(
-            outStoreClass, String.class, MetricDatum.class, hadoopConf);
-
     GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
 
     Job job = Job.getInstance(hadoopConf);
@@ -198,6 +213,8 @@ public class LogAnalyticsSpark {
     inStore.close();
     outStore.close();
 
+    log.info("Log completed with success");
+
     return 1;
   }
 }


[24/25] gora git commit: Merge branch 'GORA-386'

Posted by le...@apache.org.
Merge branch 'GORA-386'


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/7c6a63e0
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/7c6a63e0
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/7c6a63e0

Branch: refs/heads/master
Commit: 7c6a63e0dfbbc223353bab64cc6218b03bbff526
Parents: 1302573 2ce2fda
Author: Lewis John McGibbney <le...@jpl.nasa.gov>
Authored: Tue Sep 1 15:27:17 2015 -0700
Committer: Lewis John McGibbney <le...@jpl.nasa.gov>
Committed: Tue Sep 1 15:27:17 2015 -0700

----------------------------------------------------------------------
 bin/gora                                        |   5 +
 gora-core/pom.xml                               |  13 ++
 .../gora/examples/spark/SparkWordCount.java     | 152 ++++++++++++++
 .../org/apache/gora/spark/GoraSparkEngine.java  | 150 ++++++++++++++
 .../gora/mapreduce/MapReduceTestUtils.java      |  31 +++
 .../mapreduce/TestHBaseStoreWordCount.java      |   8 +
 .../mapreduce/TestMongoStoreWordCount.java      |   9 +
 gora-tutorial/pom.xml                           |   5 +
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 200 +++++++++++++++++++
 pom.xml                                         |   1 +
 10 files changed, 574 insertions(+)
----------------------------------------------------------------------



[06/25] gora git commit: * GoraSpark.java initializeInput method renamed to initialize * reduce part is added to example.

Posted by le...@apache.org.
* GoraSpark.java initializeInput method renamed to initialize
* reduce part is added to example.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/81af4d3a
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/81af4d3a
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/81af4d3a

Branch: refs/heads/master
Commit: 81af4d3afcba4633d0c5d06ead9b4256ea60862f
Parents: 445edb1
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 18:33:38 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 18:33:38 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSpark.java   |  4 +-
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 49 ++++++++++++++++----
 2 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 4279cfb..02b5b39 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -41,7 +41,7 @@ public class GoraSpark<K, V extends Persistent> {
     this.clazzV = clazzV;
   }
 
-  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
       Configuration conf, DataStore<K, V> dataStore) {
     GoraMapReduceUtils.setIOSerializations(conf, true);
 
@@ -56,7 +56,7 @@ public class GoraSpark<K, V extends Persistent> {
         clazzV);
   }
 
-  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
       DataStore<K, V> dataStore) {
     Configuration hadoopConf;
 

http://git-wip-us.apache.org/repos/asf/gora/blob/81af4d3a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0ad3e57..d69649e 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -20,6 +20,7 @@ package org.apache.gora.tutorial.log;
 import org.apache.gora.spark.GoraSpark;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.tutorial.log.generated.MetricDatum;
 import org.apache.gora.tutorial.log.generated.Pageview;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
@@ -27,11 +28,11 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
-import java.util.concurrent.TimeUnit;
-
 public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -39,18 +40,44 @@ public class LogAnalyticsSpark {
   /** The number of milliseconds in a day */
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
-  //todo _fk consider using Kyro serialization
-  private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> s = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> () {
+  // todo _fk consider using Kyro serialization
+  private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
     @Override
-    public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview) throws Exception {
+    public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
+        throws Exception {
       String url = pageview.getUrl().toString();
       Long day = getDay(pageview.getTimestamp());
-      Tuple2<String, Long> keyTuple =new Tuple2<>(url, day);
+      Tuple2<String, Long> keyTuple = new Tuple2<>(url, day);
 
       return new Tuple2<>(keyTuple, 1L);
     }
   };
 
+  private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
+    @Override
+    public Long call(Long aLong, Long aLong2) throws Exception {
+      return aLong + aLong2;
+    }
+  };
+
+  private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
+    @Override
+    public Tuple2<String, MetricDatum> call(
+        Tuple2<Tuple2<String, Long>, Long> tuple2LongTuple2) throws Exception {
+      String dimension = tuple2LongTuple2._1()._1();
+      long timestamp = tuple2LongTuple2._1()._2();
+
+      MetricDatum metricDatum = new MetricDatum();
+      metricDatum.setMetricDimension(dimension);
+      metricDatum.setTimestamp(timestamp);
+
+      String key = metricDatum.getMetricDimension().toString();
+      key += "_" + Long.toString(timestamp);
+      metricDatum.setMetric(tuple2LongTuple2._2());
+      return new Tuple2<>(key, metricDatum);
+    }
+  };
+
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println(USAGE);
@@ -93,7 +120,7 @@ public class LogAnalyticsSpark {
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
         dataStore);
 
     long count = goraRDD.count();
@@ -102,7 +129,13 @@ public class LogAnalyticsSpark {
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
 
-    JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD.values().map(s);
+    JavaRDD<Tuple2<Tuple2<String, Long>, Long>> mappedGoraRdd = goraRDD
+        .values().map(mapFunc);
+
+    JavaPairRDD<String, MetricDatum> reducedGoraRdd = JavaPairRDD
+        .fromJavaRDD(mappedGoraRdd).reduceByKey(redFunc).mapToPair(metricFunc);
+
+    System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
     return 1;
   }


[04/25] gora git commit: * map function is implemented as like in LogAnalytics.java

Posted by le...@apache.org.
* map function is implemented as like in LogAnalytics.java


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5644a21c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5644a21c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5644a21c

Branch: refs/heads/master
Commit: 5644a21c5c5678611c7cef4e2c922643088951b8
Parents: 8fbdef7
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 02:10:26 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 02:10:26 2015 +0300

----------------------------------------------------------------------
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 38 +++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/5644a21c/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 0194c3b..43acba0 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -24,12 +24,31 @@ import org.apache.gora.tutorial.log.generated.Pageview;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import scala.Tuple2;
+
+import java.util.concurrent.TimeUnit;
 
 public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
 
+  /** The number of milliseconds in a day */
+  private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
+
+  //todo _fk consider using Kyro serialization
+  private static Function<Pageview, Tuple2<String, Long>> s = new Function<Pageview, Tuple2<String, Long>>() {
+    @Override
+    public Tuple2<String, Long> call(Pageview pageview) throws Exception {
+      String key = pageview.getUrl().toString();
+      Long value = getDay(pageview.getTimestamp());
+      return new Tuple2<>(key, value);
+    }
+  };
+
   public static void main(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println(USAGE);
@@ -45,14 +64,22 @@ public class LogAnalyticsSpark {
     System.exit(ret);
   }
 
+  /**
+   * Rolls up the given timestamp to the day cardinality, so that data can be
+   * aggregated daily
+   */
+  private static long getDay(long timeStamp) {
+    return (timeStamp / DAY_MILIS) * DAY_MILIS;
+  }
+
   public int run(String inStoreClass, String outStoreClass) throws Exception {
     GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
-      
-    //todo _fk change architectural desigm
+
+    // todo _fk consider alternative architectural design
     Class[] c = new Class[1];
     c[0] = Pageview.class;
     sparkConf.registerKryoClasses(c);
@@ -64,9 +91,8 @@ public class LogAnalyticsSpark {
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
-    // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
-    // cachedGoraRdd = goraRDD.cache();
+    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc,
+        dataStore);
 
     long count = goraRDD.count();
     System.out.println("Total Count: " + count);
@@ -74,6 +100,8 @@ public class LogAnalyticsSpark {
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
 
+    JavaRDD<Tuple2<String, Long>> mappedGoraRdd = goraRDD.values().map(s);
+
     return 1;
   }
 }


[02/25] gora git commit: * GoraSpark.java initialize method renamed to initializeInput. * Architectural change is made.

Posted by le...@apache.org.
* GoraSpark.java initialize method renamed to initializeInput.
* Architectural change is made.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/ef68cead
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/ef68cead
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/ef68cead

Branch: refs/heads/master
Commit: ef68cead273324797cf292dbe6da18ee3fd819cb
Parents: 9c2d225
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Sun Jun 28 17:17:52 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Sun Jun 28 17:17:52 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSpark.java   | 50 +++++++++++++++-----
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 44 ++++++++---------
 2 files changed, 60 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
index 690e32c..4279cfb 100644
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
@@ -24,6 +24,7 @@ import org.apache.gora.mapreduce.GoraMapReduceUtils;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -37,19 +38,44 @@ public class GoraSpark<K, V extends Persistent> {
 
   public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
     this.clazzK = clazzK;
-	this.clazzV = clazzV;
+    this.clazzV = clazzV;
   }
 
-  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
       Configuration conf, DataStore<K, V> dataStore) {
-	  GoraMapReduceUtils.setIOSerializations(conf, true);
-      try {
-	    IOUtils.storeToConf(dataStore.newQuery(), conf,
-		    GoraInputFormat.QUERY_KEY);
-      } catch (IOException ioex) {
-        throw new RuntimeException(ioex.getMessage());
-	  }
-       return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class,
-	     clazzK, clazzV);
-	}
+    GoraMapReduceUtils.setIOSerializations(conf, true);
+
+    try {
+      IOUtils
+          .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex.getMessage());
+    }
+
+    return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+        clazzV);
+  }
+
+  public JavaPairRDD<K, V> initializeInput(JavaSparkContext sparkContext,
+      DataStore<K, V> dataStore) {
+    Configuration hadoopConf;
+
+      if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
+          hadoopConf = ((Configurable) dataStore).getConf();
+      } else {
+          hadoopConf = new Configuration();
+      }
+
+      GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
+
+    try {
+      IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
+              GoraInputFormat.QUERY_KEY);
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex.getMessage());
+    }
+
+    return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
+        clazzK, clazzV);
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/ef68cead/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index 214b130..9b28fd7 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -21,44 +21,44 @@ import org.apache.gora.spark.GoraSpark;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.tutorial.log.generated.Pageview;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
-public class LogAnalyticsSpark extends Configured implements Tool {
+public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
-  private static LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
 
   public static void main(String[] args) throws Exception {
-      if (args.length < 2) {
-        System.err.println(USAGE);
-        System.exit(1);
+    if (args.length < 2) {
+      System.err.println(USAGE);
+      System.exit(1);
     }
-      // run as any other MR job
-      int ret = ToolRunner.run(logAnalyticsSpark, args);
-      System.exit(ret);
+
+    String inStoreClass = args[0];
+    String outStoreClass = args[1];
+
+    LogAnalyticsSpark logAnalyticsSpark = new LogAnalyticsSpark();
+    int ret = logAnalyticsSpark.run(inStoreClass, outStoreClass);
+
+    System.exit(ret);
   }
 
-  @Override
-  public int run(String[] args) throws Exception {
-    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<Long, Pageview>(
-      Long.class, Pageview.class);
+  public int run(String inStoreClass, String outStoreClass) throws Exception {
+    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+        Pageview.class);
 
-    SparkConf conf = new SparkConf().setAppName(
+    SparkConf sparkConf = new SparkConf().setAppName(
         "Gora Integration Application").setMaster("local");
-    JavaSparkContext sc = new JavaSparkContext(conf);
+    JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+    Configuration hadoopConf = new Configuration();
 
-    String dataStoreClass = args[0];
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
-        dataStoreClass, Long.class, Pageview.class,
-    logAnalyticsSpark.getConf());
+        inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview> goraRDD = goraSpark
-        .initialize(sc, logAnalyticsSpark.getConf(), dataStore);
+    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initializeInput(sc, dataStore);
     // JavaPairRDD<Long, org.apache.gora.tutorial.log.generated.Pageview>
     // cachedGoraRdd = goraRDD.cache();
 


[21/25] gora git commit: Spark version is changed to 1.4.1 from 1.3.1.

Posted by le...@apache.org.
Spark version is changed to 1.4.1 from 1.3.1.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/3cf55026
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/3cf55026
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/3cf55026

Branch: refs/heads/master
Commit: 3cf550264d782bcffa15fdf62c584eca355cab29
Parents: 6252edd
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 28 00:57:26 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 28 00:57:26 2015 +0300

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/3cf55026/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index cb55db4..2b5821c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -644,7 +644,7 @@
     <commons-io.version>1.3.2</commons-io.version>
     <restlet.version>2.3.1</restlet.version>
 
-    <spark.version>1.3.1</spark.version>
+    <spark.version>1.4.1</spark.version>
     <!-- Misc Dependencies -->
     <guava.version>13.0</guava.version>
     <commons-lang.version>2.6</commons-lang.version>


[20/25] gora git commit: LogAnalyticsSpark is let to run from command line by passing "loganalyticsspark" argument.

Posted by le...@apache.org.
LogAnalyticsSpark is let to run from command line by passing "loganalyticsspark" argument.


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/6252eddf
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/6252eddf
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/6252eddf

Branch: refs/heads/master
Commit: 6252eddfecf5a9cba028e1d7529b6dee3ec5ef33
Parents: ad4f560
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Fri Aug 28 00:21:31 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Fri Aug 28 00:21:31 2015 +0300

----------------------------------------------------------------------
 bin/gora | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/6252eddf/bin/gora
----------------------------------------------------------------------
diff --git a/bin/gora b/bin/gora
index 7126972..8c37272 100755
--- a/bin/gora
+++ b/bin/gora
@@ -49,6 +49,7 @@ if [ $# = 0 ]; then
   echo "  goracisetup                Run the GoraCI Rackspace orchestration setup"
   echo "  logmanager                 Run the tutorial log manager"
   echo "  loganalytics               Run the tutorial log analytics"
+  echo "  loganalyticsspark          Run the tutorial log analytics spark"
   echo "  junit         	     Run the given JUnit test"
   echo "  version         	     Print Gora version to terminal"
   echo " or"
@@ -134,6 +135,10 @@ elif [ "$COMMAND" = "loganalytics" ] ; then
   MODULE=gora-tutorial
   CLASS=org.apache.gora.tutorial.log.LogAnalytics
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
+elif [ "$COMMAND" = "loganalyticsspark" ] ; then
+  MODULE=gora-tutorial
+  CLASS=org.apache.gora.tutorial.log.LogAnalyticsSpark
+  CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/classes/
 elif [ "$COMMAND" = "junit" ] ; then
   MODULE=*
   CLASSPATH=$CLASSPATH:$GORA_HOME/$MODULE/target/test-classes/