You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by le...@apache.org on 2015/09/03 09:28:43 UTC

[07/25] gora git commit: Organizing codes Documentation

Organizing codes
Documentation


Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/80c0c26d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/80c0c26d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/80c0c26d

Branch: refs/heads/master
Commit: 80c0c26d8fb6a9a84ea39f5aa96cc343b4546266
Parents: 81af4d3
Author: Furkan KAMACI <fu...@gmail.com>
Authored: Mon Jun 29 19:26:28 2015 +0300
Committer: Furkan KAMACI <fu...@gmail.com>
Committed: Mon Jun 29 19:26:28 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/gora/spark/GoraSpark.java   | 81 -----------------
 .../org/apache/gora/spark/GoraSparkEngine.java  | 96 ++++++++++++++++++++
 .../gora/tutorial/log/LogAnalyticsSpark.java    | 40 +++++++-
 3 files changed, 131 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
deleted file mode 100644
index 02b5b39..0000000
--- a/gora-core/src/main/java/org/apache/gora/spark/GoraSpark.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gora.spark;
-
-import java.io.IOException;
-
-import org.apache.gora.mapreduce.GoraInputFormat;
-import org.apache.gora.mapreduce.GoraMapReduceUtils;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.store.DataStore;
-import org.apache.gora.util.IOUtils;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-
-/**
- * Base class for Spark integration
- */
-public class GoraSpark<K, V extends Persistent> {
-  Class<K> clazzK;
-  Class<V> clazzV;
-
-  public GoraSpark(Class<K> clazzK, Class<V> clazzV) {
-    this.clazzK = clazzK;
-    this.clazzV = clazzV;
-  }
-
-  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
-      Configuration conf, DataStore<K, V> dataStore) {
-    GoraMapReduceUtils.setIOSerializations(conf, true);
-
-    try {
-      IOUtils
-          .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex.getMessage());
-    }
-
-    return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
-        clazzV);
-  }
-
-  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
-      DataStore<K, V> dataStore) {
-    Configuration hadoopConf;
-
-      if ((dataStore instanceof Configurable) && ((Configurable) dataStore).getConf() != null) {
-          hadoopConf = ((Configurable) dataStore).getConf();
-      } else {
-          hadoopConf = new Configuration();
-      }
-
-      GoraMapReduceUtils.setIOSerializations(hadoopConf, true);
-
-    try {
-      IOUtils.storeToConf(dataStore.newQuery(), hadoopConf,
-              GoraInputFormat.QUERY_KEY);
-    } catch (IOException ioex) {
-      throw new RuntimeException(ioex.getMessage());
-    }
-
-    return sparkContext.newAPIHadoopRDD(hadoopConf, GoraInputFormat.class,
-        clazzK, clazzV);
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
----------------------------------------------------------------------
diff --git a/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
new file mode 100644
index 0000000..ced44be
--- /dev/null
+++ b/gora-core/src/main/java/org/apache/gora/spark/GoraSparkEngine.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.spark;
+
+import java.io.IOException;
+
+import org.apache.gora.mapreduce.GoraInputFormat;
+import org.apache.gora.mapreduce.GoraMapReduceUtils;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.util.IOUtils;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Base class for Gora - Spark integration.
+ */
+public class GoraSparkEngine<K, V extends Persistent> {
+  Class<K> clazzK;
+  Class<V> clazzV;
+
+  public GoraSparkEngine(Class<K> clazzK, Class<V> clazzV) {
+    this.clazzK = clazzK;
+    this.clazzV = clazzV;
+  }
+
+  /**
+   * Initializes a {@link JavaPairRDD} from given Spark context, Hadoop
+   * configuration and data store.
+   * 
+   * @param sparkContext
+   *          Spark context
+   * @param conf
+   *          Hadoop configuration
+   * @param dataStore
+   *          Data store
+   * @return initialized rdd
+   */
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+      Configuration conf, DataStore<K, V> dataStore) {
+    GoraMapReduceUtils.setIOSerializations(conf, true);
+
+    try {
+      IOUtils
+          .storeToConf(dataStore.newQuery(), conf, GoraInputFormat.QUERY_KEY);
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex.getMessage());
+    }
+
+    return sparkContext.newAPIHadoopRDD(conf, GoraInputFormat.class, clazzK,
+        clazzV);
+  }
+
+  /**
+   * Initializes a {@link JavaPairRDD} from given Spark context and data store.
+   * If given data store is {@link Configurable} and has not a configuration
+   * than a Hadoop configuration is created otherwise existed configuration is
+   * used.
+   * 
+   * @param sparkContext
+   *          Spark context
+   * @param dataStore
+   *          Data store
+   * @return initialized rdd
+   */
+  public JavaPairRDD<K, V> initialize(JavaSparkContext sparkContext,
+      DataStore<K, V> dataStore) {
+    Configuration hadoopConf;
+
+    if ((dataStore instanceof Configurable)
+        && ((Configurable) dataStore).getConf() != null) {
+      hadoopConf = ((Configurable) dataStore).getConf();
+    } else {
+      hadoopConf = new Configuration();
+    }
+
+    return initialize(sparkContext, hadoopConf, dataStore);
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/80c0c26d/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
----------------------------------------------------------------------
diff --git a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
index d69649e..abced3f 100644
--- a/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
+++ b/gora-tutorial/src/main/java/org/apache/gora/tutorial/log/LogAnalyticsSpark.java
@@ -17,7 +17,9 @@
  */
 package org.apache.gora.tutorial.log;
 
-import org.apache.gora.spark.GoraSpark;
+import java.util.Map;
+
+import org.apache.gora.spark.GoraSparkEngine;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.tutorial.log.generated.MetricDatum;
@@ -33,6 +35,21 @@ import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
+/**
+ * LogAnalyticsSpark is the tutorial class to illustrate Gora Spark API. The
+ * Spark job reads the web access data stored earlier by the {@link LogManager},
+ * and calculates the aggregate daily pageviews. The output of the job is stored
+ * in a Gora compatible data store.
+ *
+ * This class illustrates the same functionality with {@link LogAnalytics} via
+ * Spark.
+ *
+ * <p>
+ * See the tutorial.html file in docs or go to the <a
+ * href="http://incubator.apache.org/gora/docs/current/tutorial.html"> web
+ * site</a>for more information.
+ * </p>
+ */
 public class LogAnalyticsSpark {
 
   private static final String USAGE = "LogAnalyticsSpark <input_data_store> <output_data_store>";
@@ -41,6 +58,9 @@ public class LogAnalyticsSpark {
   private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
 
   // todo _fk consider using Kyro serialization
+  /**
+   * map function used in calculation
+   */
   private static Function<Pageview, Tuple2<Tuple2<String, Long>, Long>> mapFunc = new Function<Pageview, Tuple2<Tuple2<String, Long>, Long>>() {
     @Override
     public Tuple2<Tuple2<String, Long>, Long> call(Pageview pageview)
@@ -53,6 +73,9 @@ public class LogAnalyticsSpark {
     }
   };
 
+    /**
+     * reduce function used in calculation
+     */
   private static Function2<Long, Long, Long> redFunc = new Function2<Long, Long, Long>() {
     @Override
     public Long call(Long aLong, Long aLong2) throws Exception {
@@ -60,6 +83,9 @@ public class LogAnalyticsSpark {
     }
   };
 
+    /**
+     * metric function used after map phase
+     */
   private static PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum> metricFunc = new PairFunction<Tuple2<Tuple2<String, Long>, Long>, String, MetricDatum>() {
     @Override
     public Tuple2<String, MetricDatum> call(
@@ -102,7 +128,7 @@ public class LogAnalyticsSpark {
   }
 
   public int run(String inStoreClass, String outStoreClass) throws Exception {
-    GoraSpark<Long, Pageview> goraSpark = new GoraSpark<>(Long.class,
+    GoraSparkEngine<Long, Pageview> goraSparkEngine = new GoraSparkEngine<>(Long.class,
         Pageview.class);
 
     SparkConf sparkConf = new SparkConf().setAppName(
@@ -120,11 +146,10 @@ public class LogAnalyticsSpark {
     DataStore<Long, Pageview> dataStore = DataStoreFactory.getDataStore(
         inStoreClass, Long.class, Pageview.class, hadoopConf);
 
-    JavaPairRDD<Long, Pageview> goraRDD = goraSpark.initialize(sc,
-        dataStore);
+    JavaPairRDD<Long, Pageview> goraRDD = goraSparkEngine.initialize(sc, dataStore);
 
     long count = goraRDD.count();
-    System.out.println("Total Count: " + count);
+    System.out.println("Total Log Count: " + count);
 
     String firstOneURL = goraRDD.first()._2().getUrl().toString();
     System.out.println(firstOneURL);
@@ -137,6 +162,11 @@ public class LogAnalyticsSpark {
 
     System.out.println("MetricDatum count:" + reducedGoraRdd.count());
 
+    Map<String, MetricDatum> metricDatumMap = reducedGoraRdd.collectAsMap();
+    for (String key : metricDatumMap.keySet()) {
+      System.out.println(key);
+    }
+
     return 1;
   }
 }