You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/03 15:49:21 UTC
incubator-tinkerpop git commit: some last minute cleanups,
comments before PR. integration tests passed over night. Spark
integration tests passed for these changes right now.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1120 79ebaf9f9 -> 8fd950216
some last minute cleanups, comments before PR. integration tests passed over night. Spark integration tests passed for these changes right now.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8fd95021
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8fd95021
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8fd95021
Branch: refs/heads/TINKERPOP-1120
Commit: 8fd9502160b7940a806247a16406663ff4b27826
Parents: 79ebaf9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue May 3 07:49:14 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue May 3 07:49:14 2016 -0600
----------------------------------------------------------------------
.../spark/process/computer/SparkExecutor.java | 41 ++++++++++----------
1 file changed, 20 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8fd95021/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 3ebcb01..c216510 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -36,7 +36,6 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -61,7 +60,7 @@ public final class SparkExecutor {
// DATA LOADING //
//////////////////
- public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
+ public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
return graphRDD.mapPartitionsToPair(partitionIterator -> {
final GraphFilter gFilter = graphFilter.clone();
return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
@@ -99,29 +98,27 @@ public final class SparkExecutor {
if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
// drop any computed properties that are cached in memory
- if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
+ vertex.dropVertexProperties(vertexComputeKeysArray);
final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
assert previousView.isEmpty();
- ///
+ // do the vertex's vertex program iteration
messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
// assert incomingMessages.isEmpty(); // maybe the program didn't read all the messages
incomingMessages.clear();
- ///
+ // detached the compute property view from the vertex
final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
Collections.emptyList() :
- IteratorUtils.list(IteratorUtils.map(
- IteratorUtils.filter(
- vertex.properties(vertexComputeKeysArray),
- VertexProperty::isPresent),
- property -> DetachedFactory.detach(property, true)));
- // drop any computed properties that are cached in memory
- if (vertexComputeKeysArray.length > 0) vertex.dropVertexProperties(vertexComputeKeysArray);
- final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+ IteratorUtils.list(IteratorUtils.map(vertex.properties(vertexComputeKeysArray), vertexProperty -> DetachedFactory.detach(vertexProperty, true)));
+ // drop compute property view as it has now been detached from the vertex
+ vertex.dropVertexProperties(vertexComputeKeysArray);
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages being sent by this vertex
if (!partitionIterator.hasNext())
workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
- return (nextView.isEmpty() && outgoingMessages.isEmpty()) ? null : new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+ return (nextView.isEmpty() && outgoingMessages.isEmpty()) ?
+ null : // if there is no view nor outgoing messages, emit nothing
+ new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages)); // else, emit the vertex id, its view, and its outgoing messages
});
}, true) // true means that the partition is preserved
.filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
@@ -131,9 +128,11 @@ public final class SparkExecutor {
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
.flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
- IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
- IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))))) // emit the outgoing message payloads one by one
- .reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+ // emit the view payload
+ IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+ // emit the outgoing message payloads one by one
+ IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+ .reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
if (a instanceof ViewIncomingPayload) {
((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
return a;
@@ -147,10 +146,10 @@ public final class SparkExecutor {
return c;
}
})
- .mapValues(payload -> {
- if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex with incoming messages
+ .mapValues(payload -> { // handle various corner cases of when views don't exist, messages don't exist, or neither exists.
+ if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex view with incoming messages
return (ViewIncomingPayload<M>) payload;
- else if (payload instanceof ViewPayload) // this happens if there is a vertex with no incoming messages
+ else if (payload instanceof ViewPayload) // this happens if there is a vertex view with no incoming messages
return new ViewIncomingPayload<>((ViewPayload) payload);
else // this happens when there is a single message to a vertex that has no view or outgoing messages
return new ViewIncomingPayload<>((MessagePayload<M>) payload);
@@ -170,12 +169,12 @@ public final class SparkExecutor {
final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
- // attach the final computed view to the cached graph
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
+ // attach the final computed view to the cached graph
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
for (final DetachedVertexProperty<Object> property : view) {
if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))