You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/31 23:21:50 UTC
incubator-tinkerpop git commit: added new SparkGraphComputer message
passing algorithm that caches the graph structure and has dynamic views and
messages.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 1d3e1cc7e -> 63959c4ac
added new SparkGraphComputer message passing algorithm that caches the graph structure and has dynamic views and messages.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/63959c4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/63959c4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/63959c4a
Branch: refs/heads/master
Commit: 63959c4acfefd68dec42ce290f843d30a690df35
Parents: 1d3e1cc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 31 15:21:29 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 31 15:21:46 2015 -0600
----------------------------------------------------------------------
.../util/detached/DetachedVertexProperty.java | 6 +
.../process/computer/GraphComputerTest.java | 2 +-
.../process/computer/spark/SparkExecutor.java | 166 ++++++++-----------
.../computer/spark/SparkGraphComputer.java | 38 +++--
.../process/computer/spark/SparkMessenger.java | 6 +-
5 files changed, 106 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
index fd8fd9f..253b647 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/detached/DetachedVertexProperty.java
@@ -126,4 +126,10 @@ public class DetachedVertexProperty<V> extends DetachedElement<Property<V>> impl
public <U> Iterator<Property<U>> properties(final String... propertyKeys) {
return (Iterator) super.properties(propertyKeys);
}
+
+ public static <V> VertexProperty<V> addTo(final Vertex vertex, final DetachedVertexProperty<V> detachedVertexProperty) {
+ final VertexProperty<V> vertexProperty = vertex.property(detachedVertexProperty.key(), detachedVertexProperty.value());
+ detachedVertexProperty.properties().forEachRemaining(property -> vertexProperty.property(property.key(), property.value()));
+ return vertexProperty;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 96ab10d..e2708b1 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -597,7 +597,7 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Override
public Set<String> getElementComputeKeys() {
- return null;
+ return Collections.emptySet();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 03de47a..fbbc9d0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -18,7 +18,6 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-import com.google.common.base.Optional;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,7 +36,6 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -58,121 +56,93 @@ public final class SparkExecutor {
private SparkExecutor() {
}
- public static <M> JavaPairRDD<Object, SparkVertexPayload<M>> executeVertexProgramIteration(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+ public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+
// execute vertex program iteration
- final JavaPairRDD<Object, SparkVertexPayload<M>> verticesWithOutgoingMessages = graphRDD
- .mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = null == viewAndMessageRDD ?
+ graphRDD.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+ final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
workerVertexProgram.workerIterationStart(memory);
- return () -> IteratorUtils.map(partitionIterator, vertex -> {
- vertex._2().getOutgoingMessages().clear(); // there should be no outgoing messages at this point
- workerVertexProgram.execute(ComputerGraph.of(vertex._2().getVertex(), elementComputeKeys), vertex._2(), memory);
+ return () -> IteratorUtils.map(partitionIterator, vertexWritable -> {
+ final Vertex vertex = vertexWritable._2().get();
+ final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, Collections.emptyList());
+ workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
+ final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
+ if (null != elementComputeKeysArray)
+ vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
if (!partitionIterator.hasNext())
workerVertexProgram.workerIterationEnd(memory);
- vertex._2().getMessages().clear(); // there should be no incoming messages at this point (only outgoing messages)
- return vertex;
+ return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
});
- }).cache();
+ }) :
+ graphRDD.leftOuterJoin(viewAndMessageRDD)
+ .mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+ final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+ final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+ final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
+ workerVertexProgram.workerIterationStart(memory);
+ return () -> IteratorUtils.map(partitionIterator, vertexWritableAndIncomingMessages -> {
+ final Vertex vertex = vertexWritableAndIncomingMessages._2()._1().get();
+ final List<M> incomingMessages = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._2() : Collections.emptyList();
+ final List<DetachedVertexProperty<Object>> view = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._1() : Collections.emptyList();
+ view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
+ final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages);
+ workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
+ final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
+ if (null != elementComputeKeysArray)
+ vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
+ if (!partitionIterator.hasNext())
+ workerVertexProgram.workerIterationEnd(memory);
+ return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
+ });
+ });
+ viewAndOutgoingMessagesRDD.cache();
// "message pass" by reducing on the vertex object id of the message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, SparkMessagePayload<M>> incomingMessages = verticesWithOutgoingMessages
- .flatMapToPair(vertexPayload -> () ->
- IteratorUtils.map(vertexPayload._2().detachOutgoingMessages(), // this removes all outgoing messages once they have been iterated by this step
- message -> new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2()))))
- .reduceByKey((messagePayloadA, messagePayloadB) -> {
- messagePayloadA.addMessages(messagePayloadB.getMessages(), messageCombiner);
- return messagePayloadA;
- });
-
- // join the incoming messages with the vertices
- final JavaPairRDD<Object, SparkVertexPayload<M>> verticesWithIncomingMessages = verticesWithOutgoingMessages
- .leftOuterJoin(incomingMessages)
- .mapValues(tuple -> {
- final SparkVertexPayload<M> vertexPayload = tuple._1();
- final SparkMessagePayload<M> messagePayload = tuple._2().orNull();
- if (null != messagePayload) {
- vertexPayload.getOutgoingMessages().clear(); // there should be no outgoing messages at this point (just to be safe)
- vertexPayload.getMessages().clear(); // there should be no incoming messages at this point (just to be safe)
- vertexPayload.getMessages().addAll(messagePayload.getMessages());
+ final JavaPairRDD<Object, List<M>> incomingMessages = viewAndOutgoingMessagesRDD
+ .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2()._2().iterator(), x -> {
+ final List<M> list = new ArrayList<>();
+ list.add(x._2());
+ return new Tuple2<>(x._1(), list);
+ })).reduceByKey((a, b) -> {
+ if (null == messageCombiner) {
+ a.addAll(b);
+ return a;
+ } else {
+ final M m = messageCombiner.combine(a.get(0),b.get(0));
+ a.clear();
+ b.clear();
+ a.add(m);
+ return a;
}
- return vertexPayload;
});
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessageRDD = viewAndOutgoingMessagesRDD
+ .mapValues(Tuple2::_1)
+ .fullOuterJoin(incomingMessages)
+ .mapValues(tuple -> new Tuple2<>(tuple._1().or(Collections.emptyList()), tuple._2().or(Collections.emptyList())));
- verticesWithIncomingMessages.foreachPartition(partitionIterator -> {
+ newViewMessageRDD.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP.
- return verticesWithIncomingMessages;
+ return newViewMessageRDD;
}
- //////////////////////////////
- /////DEMO ALGORITHM /////////
- /////////////////////////////
- /////////////////////////////
-
- public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration2(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
- final JavaPairRDD<Object, Tuple2<VertexWritable, Optional<Tuple2<List<DetachedVertexProperty<Object>>, List<M>>>>> graphViewMessagesRDD = graphRDD.leftOuterJoin(viewMessageRDD);
+ /////////////////
+ // MAP REDUCE //
+ ////////////////
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndMessagesRDD = graphViewMessagesRDD.mapPartitionsToPair(partitions -> {
- final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
- workerVertexProgram.workerIterationStart(memory);
- final SparkMessenger<M> messenger = new SparkMessenger<>();
- final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
- final String[] elementComputeKeysArray = elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
- return () -> IteratorUtils.map(partitions, graphViewMessages -> {
- final Vertex vertex = graphViewMessages._2()._1().get();
- final List<DetachedVertexProperty<Object>> view = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._1() : Collections.emptyList();
- final List<M> incomingMessages = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._2() : Collections.emptyList();
- view.forEach(property -> property.attach(vertex));
- messenger.setVertexAndMessages(vertex, incomingMessages);
- memory.setInTask(true);
- workerVertexProgram.execute(vertex, messenger, memory);
- memory.setInTask(false);
- final List<DetachedVertexProperty<Object>> properties = IteratorUtils.list(IteratorUtils.<VertexProperty<Object>, DetachedVertexProperty<Object>>map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
- vertex.properties(elementComputeKeysArray).forEachRemaining(VertexProperty::remove);
- if (!partitions.hasNext())
- workerVertexProgram.workerIterationEnd(memory);
- return new Tuple2<>(vertex.id(), new Tuple2<>(properties, messenger.getOutgoingMessages()));
- });
- });
-
- final JavaPairRDD<Object, List<DetachedVertexProperty<Object>>> newViewRDD = viewAndMessagesRDD.mapValues(Tuple2::_1);
-
- final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, List<M>> newMessagesRDD = viewAndMessagesRDD
- .flatMapToPair(viewAndMessages -> () -> viewAndMessages._2()._2().iterator())
- .mapValues(message -> {
- final List<M> list = new ArrayList<>(1);
- list.add(message);
- return list;
- }).reduceByKey((messageA, messageB) -> {
- if (null != messageCombiner) {
- final M message = Stream.concat(messageA.stream(), messageB.stream()).reduce(messageCombiner::combine).get();
- messageA.clear();
- messageA.add(message);
- return messageA;
- } else {
- messageA.addAll(messageB);
- return messageA;
- }
- });
-
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessages = newViewRDD.join(newMessagesRDD);
-
- newViewMessages.foreachPartition(x -> {
- }); // execute the view
-
- return newViewMessages;
- }
-
- public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+ public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
workerMapReduce.workerStart(MapReduce.Stage.MAP);
final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
- workerMapReduce.map(keyValue._2().getVertex(), mapEmitter);
+ workerMapReduce.map(keyValue._2().get(), mapEmitter);
if (!partitionIterator.hasNext())
workerMapReduce.workerEnd(MapReduce.Stage.MAP);
return mapEmitter.getEmissions();
@@ -202,6 +172,10 @@ public final class SparkExecutor {
return reduceRDD;
}
+ ///////////////////
+ // Input/Output //
+ //////////////////
+
public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
if (null != outputLocation) {
@@ -221,11 +195,11 @@ public final class SparkExecutor {
}
}
- public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ public static <M> void saveGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a <nullwritable,vertexwritable> stream for output
- graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2().getVertexWritable()))
+ graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
.saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
NullWritable.class,
VertexWritable.class,
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index e5388c9..94eb6a6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -44,13 +44,17 @@ import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.File;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -159,12 +163,14 @@ public final class SparkGraphComputer implements GraphComputer {
// add the project jars to the cluster
SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
// create a message-passing friendly rdd from the hadoop input format
- JavaPairRDD<Object, SparkVertexPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+ final JavaPairRDD<Object, VertexWritable> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(tuple._2().get())))
- .reduceByKey((a, b) -> a); // partition the graph across the cluster
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+ .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
+ .cache(); // partition the graph across the cluster
+ JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessageRDD = null;
////////////////////////////////
// process the vertex program //
@@ -182,7 +188,7 @@ public final class SparkGraphComputer implements GraphComputer {
// execute the vertex program
while (true) {
memory.setInTask(true);
- graphRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, memory, vertexProgramConfiguration);
+ viewAndMessageRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessageRDD, memory, vertexProgramConfiguration);
memory.setInTask(false);
if (this.vertexProgram.terminate(memory))
break;
@@ -203,18 +209,28 @@ public final class SparkGraphComputer implements GraphComputer {
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
// drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
- graphRDD = graphRDD.mapValues(vertex -> {
- vertex.getMessages().clear();
- vertex.getOutgoingMessages().clear();
- vertex.getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
- return vertex;
- }).cache();
+ final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = null == viewAndMessageRDD ?
+ graphRDD.mapValues(vertexWritable -> {
+ vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+ return vertexWritable;
+ })
+ .cache() :
+ graphRDD.leftOuterJoin(viewAndMessageRDD)
+ .mapValues(tuple -> {
+ final Vertex vertex = tuple._1().get();
+ vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
+ final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
+ view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
+ return tuple._1();
+ })
+ .cache();
+
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) graphRDD, mapReduce, newApacheConfiguration);
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
// combine TODO? is this really needed
// reduce
final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/63959c4a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 4b174e1..49847de 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -39,20 +39,18 @@ import java.util.List;
public class SparkMessenger<M> implements Messenger<M> {
private Vertex vertex;
- private Iterable<M> incomingMessages = new ArrayList<>();
+ private Iterable<M> incomingMessages;
private final List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
- public void setVertexAndMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+ public SparkMessenger(final Vertex vertex, final Iterable<M> incomingMessages) {
this.vertex = vertex;
this.incomingMessages = incomingMessages;
- this.outgoingMessages.clear();
}
public List<Tuple2<Object, M>> getOutgoingMessages() {
return this.outgoingMessages;
}
-
@Override
public Iterator<M> receiveMessages(final MessageScope messageScope) {
return this.incomingMessages.iterator();