You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/04/09 16:12:05 UTC

[08/15] incubator-tinkerpop git commit: fixed a bug in TraverserSet.remove() around iterator.remove(). Added setName() calls to RDDs in SparkGraphComputer and SparkExecutor. However, I don't see any difference in the Spark Server WebUI.

fixed a bug in TraverserSet.remove() around iterator.remove(). Added setName() calls to RDDs in SparkGraphComputer and SparkExecutor. However, I don't see any difference in the Spark Server WebUI.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fecf6e6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fecf6e6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fecf6e6d

Branch: refs/heads/TINKERPOP3-581
Commit: fecf6e6db49d25ce49cdba546a16156f7a20a0e0
Parents: 97d2bad
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 8 13:08:18 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 8 13:08:18 2015 -0600

----------------------------------------------------------------------
 .../process/traversal/traverser/util/TraverserSet.java        | 6 ++----
 .../gremlin/hadoop/process/computer/spark/SparkExecutor.java  | 6 +++---
 .../hadoop/process/computer/spark/SparkGraphComputer.java     | 7 ++++---
 3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fecf6e6d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
index 9b6562d..f598a40 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
@@ -97,11 +97,9 @@ public class TraverserSet<S> extends AbstractSet<Traverser.Admin<S>> implements
     @Override
     public Traverser.Admin<S> remove() {  // pop, exception if empty
         final Iterator<Traverser.Admin<S>> iterator = this.map.values().iterator();
-        if (!iterator.hasNext()) {
-            throw FastNoSuchElementException.instance();
-        }
+        if (!iterator.hasNext()) throw FastNoSuchElementException.instance();
         Traverser.Admin<S> next = iterator.next();
-        iterator.remove();
+        this.map.remove(next);
         return next;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fecf6e6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 1ff6552..af5a9ff 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -71,7 +71,7 @@ public final class SparkExecutor {
             final SparkMemory memory,
             final Configuration apacheConfiguration) {
 
-        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
+        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
                 graphRDD.leftOuterJoin(viewIncomingRDD))                                                                        // every other iteration may have views and messages
                 // for each partition of vertices
@@ -98,7 +98,7 @@ public final class SparkExecutor {
                             workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
                         return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                });
+                })).setName("viewOutgoingRDD");
 
         // "message pass" by reducing on the vertex object id of the view and message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
@@ -126,7 +126,7 @@ public final class SparkExecutor {
                         (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
                         new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
 
-        newViewIncomingRDD
+        newViewIncomingRDD.setName("viewIncomingRDD")
                 .foreachPartition(partitionIterator -> {
                 }); // need to complete a task so its BSP and the memory for this iteration is updated
         return newViewIncomingRDD;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fecf6e6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 1df99d3..9f3fd78 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -164,6 +164,7 @@ public final class SparkGraphComputer implements GraphComputer {
                                 VertexWritable.class)
                                 .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
                                 .reduceByKey((a, b) -> a) // TODO: why is this necessary?
+                                .setName("graphRDD")
                                 .cache(); // partition the graph across the cluster
                         JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 
@@ -204,7 +205,7 @@ public final class SparkGraphComputer implements GraphComputer {
                         //////////////////////////////
                         if (!this.mapReducers.isEmpty()) {
                             // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
-                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
+                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).setName("mapReduceGraphRDD").cache();
                             // TODO: boolean first = true;
                             for (final MapReduce mapReduce : this.mapReducers) {
                                 // TODO: if (first) first = false;
@@ -213,10 +214,10 @@ public final class SparkGraphComputer implements GraphComputer {
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);
                                 // map
-                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
+                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
                                 // combine TODO? is this really needed
                                 // reduce
-                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
                                 // write the map reduce output back to disk (memory)
                                 SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                             }