You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/10 17:48:59 UTC
incubator-tinkerpop git commit: there is a bug in Spark 1.2.1 around
Kryo registration of classes and the user submitted jars -- i.e. distributed
cache jars. This is fixed in 1.3.0. Have the code stages (commented out with
TODO) for when we bump to 1.3.0
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 26e734446 -> 24fd909ac
there is a bug in Spark 1.2.1 around Kryo registration of classes and the user submitted jars -- i.e. distributed cache jars. This is fixed in 1.3.0. Have the code stages (commented out with TODO) for when we bump to 1.3.0.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/24fd909a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/24fd909a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/24fd909a
Branch: refs/heads/master
Commit: 24fd909ac110d945a4c8d94aa28d675ee03d655e
Parents: 26e7344
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Apr 10 09:49:13 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Apr 10 09:49:13 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 4 +-
.../computer/spark/SparkGraphComputer.java | 16 +++---
.../gremlin/hadoop/structure/io/IOClasses.java | 52 ++++++++++++++++++++
.../gremlin/hadoop/HadoopGraphProvider.java | 1 +
4 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index f493c23..5203b21 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -99,7 +99,7 @@ public final class SparkExecutor {
workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
});
- }));
+ })).setName("viewOutgoingRDD");
// "message pass" by reducing on the vertex object id of the view and message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
@@ -127,7 +127,7 @@ public final class SparkExecutor {
(ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
- newViewIncomingRDD
+ newViewIncomingRDD.setName("viewIncomingRDD")
.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
return newViewIncomingRDD;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index fc79c64..1b51690 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -43,7 +43,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +50,6 @@ import scala.Tuple2;
import java.io.File;
import java.util.HashSet;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -152,8 +150,14 @@ public final class SparkGraphComputer implements GraphComputer {
// wire up a spark context
final SparkConf sparkConfiguration = new SparkConf();
sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
- //final List<Class> classes = GryoMapper.build().create().getRegisteredClasses();
- //sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()])); //TODO
+ /*final List<Class> classes = new ArrayList<>();
+ classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
+ classes.addAll(IOClasses.getSharedHadoopClasses());
+ classes.add(ViewPayload.class);
+ classes.add(MessagePayload.class);
+ classes.add(ViewIncomingPayload.class);
+ classes.add(ViewOutgoingPayload.class);
+ sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
@@ -216,10 +220,10 @@ public final class SparkGraphComputer implements GraphComputer {
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
// combine TODO? is this really needed
// reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
// write the map reduce output back to disk (memory)
SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java
new file mode 100644
index 0000000..4efd98c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class IOClasses {
+
+ public static List<Class> getGryoClasses(final GryoMapper mapper) {
+ return mapper.getRegisteredClasses();
+ }
+
+ public static List<Class> getSharedHadoopClasses() {
+ final List<Class> hadoopClasses = new ArrayList<>();
+ hadoopClasses.add(VertexWritable.class);
+ hadoopClasses.add(ObjectWritable.class);
+ hadoopClasses.add(StarGraph.class);
+ hadoopClasses.add(StarGraph.StarVertex.class);
+ hadoopClasses.add(StarGraph.StarVertexProperty.class);
+ hadoopClasses.add(StarGraph.StarProperty.class);
+ hadoopClasses.add(StarGraph.StarOutEdge.class);
+ hadoopClasses.add(StarGraph.StarInEdge.class);
+ hadoopClasses.add(StarGraph.StarAdjacentVertex.class);
+ return hadoopClasses;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index b8ba425..cd33070 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -122,6 +122,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
/// spark configuration
put("spark.master", "local[4]");
put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ // put("spark.kryo.registrationRequired",true);
}};
}