You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:05 UTC

[10/20] incubator-tinkerpop git commit: Spark jar cache supported for sending HADOOP_GREMLIN_LIBS jars to the cluster.

Spark jar cache supported for sending HADOOP_GREMLIN_LIBS jars to the cluster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3ed0fa6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3ed0fa6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3ed0fa6c

Branch: refs/heads/master
Commit: 3ed0fa6cfcfb00e68e288059da188ee34337caf4
Parents: 3855bdc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 09:46:38 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 09:46:38 2015 -0700

----------------------------------------------------------------------
 hadoop-gremlin/conf/spark-kryo.properties       |  2 +-
 .../computer/giraph/GiraphGraphComputer.java    |  6 ++---
 .../computer/spark/SparkGraphComputer.java      | 25 ++++++++++++++++++--
 3 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3ed0fa6c/hadoop-gremlin/conf/spark-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-kryo.properties b/hadoop-gremlin/conf/spark-kryo.properties
index de4df3b..ec8b393 100644
--- a/hadoop-gremlin/conf/spark-kryo.properties
+++ b/hadoop-gremlin/conf/spark-kryo.properties
@@ -35,4 +35,4 @@ gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.page
 spark.master=local[4]
 spark.executor.memory=1024m
 spark.eventLog.enabled=true
-#spark.serializer=org.apache.spark.serializer.KryoSerializer
\ No newline at end of file
+spark.serializer=org.apache.spark.serializer.JavaSerializer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3ed0fa6c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 7a5e362..589c22c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -191,11 +191,11 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
     private void loadJars(final FileSystem fs) {
         final String hadoopGremlinLibsRemote = "hadoop-gremlin-libs";
         if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
-            final String giraphGremlinLibsLocal = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
-            if (null == giraphGremlinLibsLocal)
+            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLocalLibs)
                 LOGGER.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
             else {
-                final String[] paths = giraphGremlinLibsLocal.split(":");
+                final String[] paths = hadoopGremlinLocalLibs.split(":");
                 for (final String path : paths) {
                     final File file = new File(path);
                     if (file.exists()) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3ed0fa6c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 774c3c7..4b30e16 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,6 +55,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
+import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -125,8 +127,9 @@ public class SparkGraphComputer implements GraphComputer {
                             hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
 
                         // set up the input format
-                        final JavaSparkContext sc = new JavaSparkContext(sparkConfiguration);
-                        final JavaPairRDD<NullWritable, VertexWritable> rdd = sc.newAPIHadoopRDD(hadoopConfiguration,
+                        final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
+                        SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+                        final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class);
@@ -183,6 +186,24 @@ public class SparkGraphComputer implements GraphComputer {
         // a cheap action
     }
 
+    private static void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
+        if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+            final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+            if (null == hadoopGremlinLocalLibs)
+                LOGGER.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+            else {
+                final String[] paths = hadoopGremlinLocalLibs.split(":");
+                for (final String path : paths) {
+                    final File file = new File(path);
+                    if (file.exists())
+                        Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
+                    else
+                        LOGGER.warn(path + " does not reference a valid directory -- proceeding regardless");
+                }
+            }
+        }
+    }
+
     /////////////////
 
     public static void main(final String[] args) throws Exception {