You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/04/09 16:12:05 UTC
[08/15] incubator-tinkerpop git commit: fixed a bug in
TraverserSet.remove() around iterator.remove(). Added setName() calls to RDDs
in SparkGraphComputer and SparkExecutor. However,
I don't see any difference in the Spark Server WebUI.
fixed a bug in TraverserSet.remove() around iterator.remove(). Added setName() calls to RDDs in SparkGraphComputer and SparkExecutor. However, I don't see any difference in the Spark Server WebUI.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fecf6e6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fecf6e6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fecf6e6d
Branch: refs/heads/TINKERPOP3-581
Commit: fecf6e6db49d25ce49cdba546a16156f7a20a0e0
Parents: 97d2bad
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 8 13:08:18 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 8 13:08:18 2015 -0600
----------------------------------------------------------------------
.../process/traversal/traverser/util/TraverserSet.java | 6 ++----
.../gremlin/hadoop/process/computer/spark/SparkExecutor.java | 6 +++---
.../hadoop/process/computer/spark/SparkGraphComputer.java | 7 ++++---
3 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fecf6e6d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
index 9b6562d..f598a40 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/TraverserSet.java
@@ -97,11 +97,9 @@ public class TraverserSet<S> extends AbstractSet<Traverser.Admin<S>> implements
@Override
public Traverser.Admin<S> remove() { // pop, exception if empty
final Iterator<Traverser.Admin<S>> iterator = this.map.values().iterator();
- if (!iterator.hasNext()) {
- throw FastNoSuchElementException.instance();
- }
+ if (!iterator.hasNext()) throw FastNoSuchElementException.instance();
Traverser.Admin<S> next = iterator.next();
- iterator.remove();
+ this.map.remove(next);
return next;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fecf6e6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 1ff6552..af5a9ff 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -71,7 +71,7 @@ public final class SparkExecutor {
final SparkMemory memory,
final Configuration apacheConfiguration) {
- final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
+ final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices
@@ -98,7 +98,7 @@ public final class SparkExecutor {
workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
});
- });
+ })).setName("viewOutgoingRDD");
// "message pass" by reducing on the vertex object id of the view and message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
@@ -126,7 +126,7 @@ public final class SparkExecutor {
(ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
- newViewIncomingRDD
+ newViewIncomingRDD.setName("viewIncomingRDD")
.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
return newViewIncomingRDD;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fecf6e6d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 1df99d3..9f3fd78 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -164,6 +164,7 @@ public final class SparkGraphComputer implements GraphComputer {
VertexWritable.class)
.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
.reduceByKey((a, b) -> a) // TODO: why is this necessary?
+ .setName("graphRDD")
.cache(); // partition the graph across the cluster
JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
@@ -204,7 +205,7 @@ public final class SparkGraphComputer implements GraphComputer {
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
// drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
- final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
+ final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).setName("mapReduceGraphRDD").cache();
// TODO: boolean first = true;
for (final MapReduce mapReduce : this.mapReducers) {
// TODO: if (first) first = false;
@@ -213,10 +214,10 @@ public final class SparkGraphComputer implements GraphComputer {
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
// combine TODO? is this really needed
// reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
// write the map reduce output back to disk (memory)
SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
}