You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/10 17:48:59 UTC

incubator-tinkerpop git commit: there is a bug in Spark 1.2.1 around Kryo registration of classes and the user submitted jars -- i.e. distributed cache jars. This is fixed in 1.3.0. Have the code stages (commented out with TODO) for when we bump to 1.3.0

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 26e734446 -> 24fd909ac


there is a bug in Spark 1.2.1 around Kryo registration of classes and the user submitted jars -- i.e. distributed cache jars. This is fixed in 1.3.0. Have the code stages (commented out with TODO) for when we bump to 1.3.0.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/24fd909a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/24fd909a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/24fd909a

Branch: refs/heads/master
Commit: 24fd909ac110d945a4c8d94aa28d675ee03d655e
Parents: 26e7344
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Apr 10 09:49:13 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Apr 10 09:49:13 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java   |  4 +-
 .../computer/spark/SparkGraphComputer.java      | 16 +++---
 .../gremlin/hadoop/structure/io/IOClasses.java  | 52 ++++++++++++++++++++
 .../gremlin/hadoop/HadoopGraphProvider.java     |  1 +
 4 files changed, 65 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index f493c23..5203b21 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -99,7 +99,7 @@ public final class SparkExecutor {
                             workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
                         return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                }));
+                })).setName("viewOutgoingRDD");
 
         // "message pass" by reducing on the vertex object id of the view and message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
@@ -127,7 +127,7 @@ public final class SparkExecutor {
                         (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
                         new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
 
-        newViewIncomingRDD
+        newViewIncomingRDD.setName("viewIncomingRDD")
                 .foreachPartition(partitionIterator -> {
                 }); // need to complete a task so its BSP and the memory for this iteration is updated
         return newViewIncomingRDD;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index fc79c64..1b51690 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -43,7 +43,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,7 +50,6 @@ import scala.Tuple2;
 
 import java.io.File;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -152,8 +150,14 @@ public final class SparkGraphComputer implements GraphComputer {
                     // wire up a spark context
                     final SparkConf sparkConfiguration = new SparkConf();
                     sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
-                    //final List<Class> classes = GryoMapper.build().create().getRegisteredClasses();
-                    //sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));     //TODO
+                    /*final List<Class> classes = new ArrayList<>();
+                    classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
+                    classes.addAll(IOClasses.getSharedHadoopClasses());
+                    classes.add(ViewPayload.class);
+                    classes.add(MessagePayload.class);
+                    classes.add(ViewIncomingPayload.class);
+                    classes.add(ViewOutgoingPayload.class);
+                    sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
 
                     hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
                     if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
@@ -216,10 +220,10 @@ public final class SparkGraphComputer implements GraphComputer {
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);
                                 // map
-                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
+                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
                                 // combine TODO? is this really needed
                                 // reduce
-                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
                                 // write the map reduce output back to disk (memory)
                                 SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java
new file mode 100644
index 0000000..4efd98c
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/IOClasses.java
@@ -0,0 +1,52 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class IOClasses {
+
+    public static List<Class> getGryoClasses(final GryoMapper mapper) {
+        return mapper.getRegisteredClasses();
+    }
+
+    public static List<Class> getSharedHadoopClasses() {
+        final List<Class> hadoopClasses = new ArrayList<>();
+        hadoopClasses.add(VertexWritable.class);
+        hadoopClasses.add(ObjectWritable.class);
+        hadoopClasses.add(StarGraph.class);
+        hadoopClasses.add(StarGraph.StarVertex.class);
+        hadoopClasses.add(StarGraph.StarVertexProperty.class);
+        hadoopClasses.add(StarGraph.StarProperty.class);
+        hadoopClasses.add(StarGraph.StarOutEdge.class);
+        hadoopClasses.add(StarGraph.StarInEdge.class);
+        hadoopClasses.add(StarGraph.StarAdjacentVertex.class);
+        return hadoopClasses;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/24fd909a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index b8ba425..cd33070 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -122,6 +122,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
             /// spark configuration
             put("spark.master", "local[4]");
             put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+            // put("spark.kryo.registrationRequired",true);
         }};
     }