You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/03 22:53:25 UTC
[7/9] incubator-tinkerpop git commit: syncing with 1120.
syncing with 1120.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e8bba359
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e8bba359
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e8bba359
Branch: refs/heads/TINKERPOP-1288
Commit: e8bba359be7a773a138b023929c0a2c02a9d6de8
Parents: 77480a2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue May 3 14:48:27 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue May 3 14:48:27 2016 -0600
----------------------------------------------------------------------
.../spark/process/computer/SparkExecutor.java | 138 ++++++++++---------
1 file changed, 70 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e8bba359/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index aadb70d..3e6a816 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -21,8 +21,6 @@ package org.apache.tinkerpop.gremlin.spark.process.computer;
import com.google.common.base.Optional;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -62,7 +60,7 @@ public final class SparkExecutor {
// DATA LOADING //
//////////////////
- public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
+ public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) {
return graphRDD.mapPartitionsToPair(partitionIterator -> {
final GraphFilter gFilter = graphFilter.clone();
return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
@@ -80,17 +78,16 @@ public final class SparkExecutor {
final SparkMemory memory,
final Configuration apacheConfiguration) {
- final boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
- if (partitionedGraphRDD && null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
+ if (null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
assert graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get());
- final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
+ final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices emit a view and their outgoing messages
.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
- final String[] elementComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
+ final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
final SparkMessenger<M> messenger = new SparkMessenger<>();
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
@@ -98,70 +95,67 @@ public final class SparkExecutor {
final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList();
// revive compute properties if they already exist
- if (memory.isInitialIteration() && elementComputeKeysArray.length > 0) {
- vertex.properties(elementComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
- }
+ if (memory.isInitialIteration() && vertexComputeKeysArray.length > 0)
+ vertex.properties(vertexComputeKeysArray).forEachRemaining(vertexProperty -> previousView.add(DetachedFactory.detach(vertexProperty, true)));
// drop any computed properties that are cached in memory
- if (elementComputeKeysArray.length > 0)
- vertex.dropVertexProperties(elementComputeKeysArray);
+ vertex.dropVertexProperties(vertexComputeKeysArray);
final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
- previousView.forEach(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
- // previousView.clear(); // no longer needed so kill it from memory
- ///
+ IteratorUtils.removeOnNext(previousView.iterator()).forEachRemaining(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
+ assert previousView.isEmpty();
+ // do the vertex's vertex program iteration
messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
- // incomingMessages.clear(); // no longer needed so kill it from memory
- ///
- final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
+ // assert incomingMessages.isEmpty(); // maybe the program didn't read all the messages
+ incomingMessages.clear();
+ // detached the compute property view from the vertex
+ final List<DetachedVertexProperty<Object>> nextView = vertexComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
Collections.emptyList() :
- IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
- final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+ IteratorUtils.list(IteratorUtils.map(vertex.properties(vertexComputeKeysArray), vertexProperty -> DetachedFactory.detach(vertexProperty, true)));
+ // drop compute property view as it has now been detached from the vertex
+ vertex.dropVertexProperties(vertexComputeKeysArray);
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages being sent by this vertex
if (!partitionIterator.hasNext())
workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
- return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+ return (nextView.isEmpty() && outgoingMessages.isEmpty()) ?
+ null : // if there is no view nor outgoing messages, emit nothing
+ new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages)); // else, emit the vertex id, its view, and its outgoing messages
});
- }, true)); // true means that the partition is preserved
+ }, true) // true means that the partition is preserved
+ .filter(tuple -> null != tuple); // if there are no messages or views, then the tuple is null (memory optimization)
// the graphRDD and the viewRDD must have the same partitioner
- if (partitionedGraphRDD)
- assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
+ assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
// "message pass" by reducing on the vertex object id of the view and message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
-
- /////////////////////////////////////////////////////////////
- /////////////////////////////////////////////////////////////
- final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction =
- tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
- IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
- IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
- final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
- if (a instanceof ViewIncomingPayload) {
- ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
- return a;
- } else if (b instanceof ViewIncomingPayload) {
- ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
- return b;
- } else {
- final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
- c.mergePayload(a, messageCombiner);
- c.mergePayload(b, messageCombiner);
- return c;
- }
- };
- /////////////////////////////////////////////////////////////
- /////////////////////////////////////////////////////////////
-
- final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD =
- (partitionedGraphRDD ?
- viewOutgoingRDD.flatMapToPair(messageFunction).reduceByKey(graphRDD.partitioner().get(), reducerFunction) :
- viewOutgoingRDD.flatMapToPair(messageFunction).reduceByKey(reducerFunction))
- .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
- .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
- .mapValues(payload -> payload instanceof ViewIncomingPayload ?
- (ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
- new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
+ .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+ // emit the view payload
+ IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+ // emit the outgoing message payloads one by one
+ IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+ .reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+ if (a instanceof ViewIncomingPayload) {
+ ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
+ return a;
+ } else if (b instanceof ViewIncomingPayload) {
+ ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
+ return b;
+ } else {
+ final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
+ c.mergePayload(a, messageCombiner);
+ c.mergePayload(b, messageCombiner);
+ return c;
+ }
+ })
+ .mapValues(payload -> { // handle various corner cases of when views don't exist, messages don't exist, or neither exists.
+ if (payload instanceof ViewIncomingPayload) // this happens if there is a vertex view with incoming messages
+ return (ViewIncomingPayload<M>) payload;
+ else if (payload instanceof ViewPayload) // this happens if there is a vertex view with no incoming messages
+ return new ViewIncomingPayload<>((ViewPayload) payload);
+ else // this happens when there is a single message to a vertex that has no view or outgoing messages
+ return new ViewIncomingPayload<>((MessagePayload<M>) payload);
+ });
// the graphRDD and the viewRDD must have the same partitioner
- if (partitionedGraphRDD)
- assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
+ assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
newViewIncomingRDD
.foreachPartition(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
@@ -169,17 +163,20 @@ public final class SparkExecutor {
return newViewIncomingRDD;
}
- public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final Set<VertexComputeKey> vertexComputeKeys) {
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(
+ final JavaPairRDD<Object, VertexWritable> graphRDD,
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+ final Set<VertexComputeKey> vertexComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
- if (graphRDD.partitioner().isPresent())
- assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
- // attach the final computed view to the cached graph
+ assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
+ final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(vertexComputeKeys); // the compute keys as an array
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
+ vertex.dropVertexProperties(vertexComputeKeysArray); // drop all existing compute keys
+ // attach the final computed view to the cached graph
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
for (final DetachedVertexProperty<Object> property : view) {
- vertex.dropVertexProperties(property.key());
if (!VertexProgramHelper.isTransientVertexComputeKey(property.key(), vertexComputeKeys))
property.attach(Attachable.Method.create(vertex));
}
@@ -191,7 +188,9 @@ public final class SparkExecutor {
// MAP REDUCE //
////////////////
- public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+ public static <K, V> JavaPairRDD<K, V> executeMap(
+ final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
+ final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
@@ -201,14 +200,17 @@ public final class SparkExecutor {
return mapRDD;
}
- public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
+ final Configuration apacheConfiguration) {
return mapRDD.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
}
- public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
+ final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
+ final Configuration apacheConfiguration) {
JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
@@ -217,4 +219,4 @@ public final class SparkExecutor {
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
return reduceRDD;
}
-}
+}
\ No newline at end of file