You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/13 19:37:51 UTC

[33/42] tinkerpop git commit: found a bug that was introduced during the KryoShim work earlier this week. I made things super explicit in SparkGraphComputer as to what is the GraphComputer configuration and what is the VertexProgram configuration so that

found a bug that was introduced during the KryoShim work earlier this week. I made things super explicit in SparkGraphComputer as to what is the GraphComputer configuration and what is the VertexProgram configuration so that these issues don't pop up again. Simple fix -- CTR.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b36b42f6
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b36b42f6
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b36b42f6

Branch: refs/heads/TINKERPOP-1278
Commit: b36b42f6cd8cf74bc6bf267e732d79d55a0cd719
Parents: ff52eb6
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jun 10 12:10:17 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jun 10 12:10:34 2016 -0600

----------------------------------------------------------------------
 .../spark/process/computer/SparkExecutor.java   | 31 ++++++-------
 .../process/computer/SparkGraphComputer.java    | 46 ++++++++++----------
 2 files changed, 39 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 4db8086..8dd2381 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -78,7 +78,8 @@ public final class SparkExecutor {
             final JavaPairRDD<Object, VertexWritable> graphRDD,
             final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
             final SparkMemory memory,
-            final Configuration apacheConfiguration) {
+            final Configuration graphComputerConfiguration,    // has the Graph/GraphComputer.configuration() information
+            final Configuration vertexProgramConfiguration) { // has the VertexProgram.loadState() information
 
         boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
 
@@ -89,8 +90,8 @@ public final class SparkExecutor {
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                   // every other iteration may have views and messages
                 // for each partition of vertices emit a view and their outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
-                    KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-                    final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
+                    KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+                    final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
                     final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
@@ -132,10 +133,10 @@ public final class SparkExecutor {
         /////////////////////////////////////////////////////////////
         /////////////////////////////////////////////////////////////
         final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction =
-                tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+                tuple -> () -> IteratorUtils.concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),      // emit the view payload
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
-        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
+        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null);
         final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> {      // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
             if (a instanceof ViewIncomingPayload) {
                 ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
@@ -170,7 +171,7 @@ public final class SparkExecutor {
             assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
-                    KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
+                    KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
                 }); // need to complete a task so its BSP and the memory for this iteration is updated
         return newViewIncomingRDD;
     }
@@ -203,10 +204,10 @@ public final class SparkExecutor {
 
     public static <K, V> JavaPairRDD<K, V> executeMap(
             final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
-            final Configuration apacheConfiguration) {
+            final Configuration graphComputerConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+            KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getMapKeySort().isPresent())
             mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
@@ -214,19 +215,19 @@ public final class SparkExecutor {
     }
 
     public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
-                                                                    final Configuration apacheConfiguration) {
+                                                                    final Configuration graphComputerConfiguration) {
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
-            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+            KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
         });
     }
 
     public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
             final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
-            final Configuration apacheConfiguration) {
+            final Configuration graphComputerConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
-            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+            KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 9e05e53..5178225 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -135,19 +135,19 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         return computerService.submit(() -> {
             final long startTime = System.currentTimeMillis();
             // apache and hadoop configurations that are used throughout the graph computer computation
-            final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
-            if (!apacheConfiguration.containsKey(Constants.SPARK_SERIALIZER))
-                apacheConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
-            apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
-            final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+            final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
+            if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
+                graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+            graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
+            final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
             final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
-            final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration);
+            final Storage sparkContextStorage = SparkContextStorage.open(graphComputerConfiguration);
             final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
             final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
             final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
             final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
-            final boolean skipPartitioner = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
-            final boolean skipPersist = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
+            final boolean skipPartitioner = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
+            final boolean skipPersist = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
             String inputLocation = null;
             if (inputFromSpark)
                 inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), sparkContextStorage).orElse(null);
@@ -158,7 +158,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
             if (null != inputLocation && inputFromHDFS) {
                 try {
-                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
+                    graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
                     hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
                 } catch (final IOException e) {
                     throw new IllegalStateException(e.getMessage(), e);
@@ -176,7 +176,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         OutputFormatRDD.class.newInstance();
                 // if the input class can filter on load, then set the filters
                 if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
-                    GraphFilterAware.storeGraphFilter(apacheConfiguration, hadoopConfiguration, this.graphFilter);
+                    GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
                     filtered = false;
                 } else if (inputRDD instanceof GraphFilterAware) {
                     ((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
@@ -214,7 +214,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 boolean partitioned = false;
-                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(apacheConfiguration, sparkContext);
+                JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
                 // if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
                 if (filtered) {
                     this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
@@ -255,10 +255,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
                     /////////////////
                     // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
-                    if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
+                    if (graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
                         try {
                             final SparkVertexProgramInterceptor<VertexProgram> interceptor =
-                                    (SparkVertexProgramInterceptor) Class.forName(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
+                                    (SparkVertexProgramInterceptor) Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
                             computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
                         } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
                             throw new IllegalStateException(e.getMessage());
@@ -278,7 +278,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                                 throw new TraversalInterruptedException();
                             }
                             memory.setInExecute(true);
-                            viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+                            viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, graphComputerConfiguration, vertexProgramConfiguration);
                             memory.setInExecute(false);
                             if (this.vertexProgram.terminate(memory))
                                 break;
@@ -301,7 +301,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     // write the computed graph to the respective output (rdd or output format)
                     if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
                         assert null != computedGraphRDD; // the logic holds that a computeGraphRDD must be created at this point
-                        outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
+                        outputRDD.writeGraphRDD(graphComputerConfiguration, computedGraphRDD);
                     }
                 }
 
@@ -330,7 +330,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
 
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job
-                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+                        final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(graphComputerConfiguration);
                         mapReduce.storeState(newApacheConfiguration);
                         // map
                         final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, newApacheConfiguration);
@@ -340,7 +340,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                         final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
                         // write the map reduce output back to disk and computer result memory
                         if (null != outputRDD)
-                            mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD));
+                            mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(graphComputerConfiguration, mapReduce.getMemoryKey(), reduceRDD));
                     }
                     // if the mapReduceRDD is not simply the computed graph, unpersist the mapReduceRDD
                     if (computedGraphCreated && !outputToSpark) {
@@ -370,13 +370,13 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 // clear properties that should not be propagated in an OLAP chain
-                apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
-                apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
-                apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
-                apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
-                return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
+                graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+                graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
+                graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
+                graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+                return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
             } finally {
-                if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+                if (!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
                     Spark.close();
             }
         });