You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 15:42:05 UTC
[10/20] incubator-tinkerpop git commit: Spark jar cache supported for
sending HADOOP_GREMLIN_LIBS jars to the cluster.
Spark jar cache supported for sending HADOOP_GREMLIN_LIBS jars to the cluster.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3ed0fa6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3ed0fa6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3ed0fa6c
Branch: refs/heads/master
Commit: 3ed0fa6cfcfb00e68e288059da188ee34337caf4
Parents: 3855bdc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 3 09:46:38 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 3 09:46:38 2015 -0700
----------------------------------------------------------------------
hadoop-gremlin/conf/spark-kryo.properties | 2 +-
.../computer/giraph/GiraphGraphComputer.java | 6 ++---
.../computer/spark/SparkGraphComputer.java | 25 ++++++++++++++++++--
3 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3ed0fa6c/hadoop-gremlin/conf/spark-kryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/spark-kryo.properties b/hadoop-gremlin/conf/spark-kryo.properties
index de4df3b..ec8b393 100644
--- a/hadoop-gremlin/conf/spark-kryo.properties
+++ b/hadoop-gremlin/conf/spark-kryo.properties
@@ -35,4 +35,4 @@ gremlin.vertexProgram=org.apache.tinkerpop.gremlin.process.computer.ranking.page
spark.master=local[4]
spark.executor.memory=1024m
spark.eventLog.enabled=true
-#spark.serializer=org.apache.spark.serializer.KryoSerializer
\ No newline at end of file
+spark.serializer=org.apache.spark.serializer.JavaSerializer
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3ed0fa6c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
index 7a5e362..589c22c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/giraph/GiraphGraphComputer.java
@@ -191,11 +191,11 @@ public class GiraphGraphComputer extends Configured implements GraphComputer, To
private void loadJars(final FileSystem fs) {
final String hadoopGremlinLibsRemote = "hadoop-gremlin-libs";
if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
- final String giraphGremlinLibsLocal = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
- if (null == giraphGremlinLibsLocal)
+ final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+ if (null == hadoopGremlinLocalLibs)
LOGGER.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
else {
- final String[] paths = giraphGremlinLibsLocal.split(":");
+ final String[] paths = hadoopGremlinLocalLibs.split(":");
for (final String path : paths) {
final File file = new File(path);
if (file.exists()) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3ed0fa6c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 774c3c7..4b30e16 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,6 +55,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
+import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -125,8 +127,9 @@ public class SparkGraphComputer implements GraphComputer {
hadoopConfiguration.set("mapred.input.dir", hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
// set up the input format
- final JavaSparkContext sc = new JavaSparkContext(sparkConfiguration);
- final JavaPairRDD<NullWritable, VertexWritable> rdd = sc.newAPIHadoopRDD(hadoopConfiguration,
+ final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
+ SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
+ final JavaPairRDD<NullWritable, VertexWritable> rdd = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class);
@@ -183,6 +186,24 @@ public class SparkGraphComputer implements GraphComputer {
// a cheap action
}
+ private static void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
+ if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+ final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+ if (null == hadoopGremlinLocalLibs)
+ LOGGER.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+ else {
+ final String[] paths = hadoopGremlinLocalLibs.split(":");
+ for (final String path : paths) {
+ final File file = new File(path);
+ if (file.exists())
+ Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
+ else
+ LOGGER.warn(path + " does not reference a valid directory -- proceeding regardless");
+ }
+ }
+ }
+ }
+
/////////////////
public static void main(final String[] args) throws Exception {