You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/27 21:03:08 UTC
incubator-tinkerpop git commit: have a new experimental algorithm for
SparkGraphComputer -- doesn't work perfectly right now,
but is saved in SparkExecutor for growth.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master d16fef7ae -> 3f073ae1b
have a new experimental algorithm for SparkGraphComputer -- doesn't work perfectly right now, but is saved in SparkExecutor for growth.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3f073ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3f073ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3f073ae1
Branch: refs/heads/master
Commit: 3f073ae1b65e981505a05d5851eeb52070d5b1d2
Parents: d16fef7
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Mar 27 14:03:04 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Mar 27 14:03:04 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 77 +++++++++++++++++++-
.../process/computer/spark/SparkMapEmitter.java | 9 ++-
.../process/computer/spark/SparkMessenger.java | 65 +++++++++++++++++
.../computer/spark/SparkReduceEmitter.java | 9 ++-
4 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 1fa98f1..ec1dd6f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -18,6 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+import com.google.common.base.Optional;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -35,12 +36,20 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -93,16 +102,76 @@ public final class SparkExecutor {
return verticesHoldingIncomingMessages;
}
+ //////////////////////////////
+ /////DEMO ALGORITHM /////////
+ /////////////////////////////
+ /////////////////////////////
+
+ public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration2(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>,List<M>>> viewMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+ final JavaPairRDD<Object, Tuple2<VertexWritable, Optional<Tuple2<List<DetachedVertexProperty<Object>>,List<M>>>>> graphViewMessagesRDD = graphRDD.leftOuterJoin(viewMessageRDD);
+
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndMessagesRDD = graphViewMessagesRDD.mapPartitionsToPair(partitions -> {
+ final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+ workerVertexProgram.workerIterationStart(memory);
+ final SparkMessenger<M> messenger = new SparkMessenger<>();
+ final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+ final String[] elementComputeKeysArray = elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
+ return () -> IteratorUtils.map(partitions, graphViewMessages -> {
+ final Vertex vertex = graphViewMessages._2()._1().get();
+ final List<DetachedVertexProperty<Object>> view = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._1() : Collections.emptyList();
+ final List<M> incomingMessages = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._2() : Collections.emptyList();
+ view.forEach(property -> property.attach(vertex));
+ messenger.setVertexAndMessages(vertex, incomingMessages);
+ memory.setInTask(true);
+ workerVertexProgram.execute(vertex, messenger, memory);
+ memory.setInTask(false);
+ final List<DetachedVertexProperty<Object>> properties = IteratorUtils.list(IteratorUtils.<VertexProperty<Object>, DetachedVertexProperty<Object>>map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+ vertex.properties(elementComputeKeysArray).forEachRemaining(VertexProperty::remove);
+ if (!partitions.hasNext())
+ workerVertexProgram.workerIterationEnd(memory);
+ return new Tuple2<>(vertex.id(), new Tuple2<>(properties, messenger.getOutgoingMessages()));
+ });
+ });
+
+ final JavaPairRDD<Object, List<DetachedVertexProperty<Object>>> newViewRDD = viewAndMessagesRDD.mapValues(Tuple2::_1);
+
+ final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
+ final JavaPairRDD<Object, List<M>> newMessagesRDD = viewAndMessagesRDD
+ .flatMapToPair(viewAndMessages -> () -> viewAndMessages._2()._2().iterator())
+ .mapValues(message -> {
+ final List<M> list = new ArrayList<>(1);
+ list.add(message);
+ return list;
+ }).reduceByKey((messageA, messageB) -> {
+ if (null != messageCombiner) {
+ final M message = Stream.concat(messageA.stream(), messageB.stream()).reduce(messageCombiner::combine).get();
+ messageA.clear();
+ messageA.add(message);
+ return messageA;
+ } else {
+ messageA.addAll(messageB);
+ return messageA;
+ }
+ });
+
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessages = newViewRDD.join(newMessagesRDD);
+
+ newViewMessages.foreachPartition(x -> {
+ }); // execute the view
+
+ return newViewMessages;
+ }
+
public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
workerMapReduce.workerStart(MapReduce.Stage.MAP);
+ final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
- final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
workerMapReduce.map(keyValue._2().getVertex(), mapEmitter);
if (!partitionIterator.hasNext())
workerMapReduce.workerEnd(MapReduce.Stage.MAP);
- return mapEmitter.getEmissions().iterator();
+ return mapEmitter.getEmissions();
});
});
if (mapReduce.getMapKeySort().isPresent())
@@ -116,12 +185,12 @@ public final class SparkExecutor {
JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+ final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
- final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
if (!partitionIterator.hasNext())
workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
- return reduceEmitter.getEmissions().iterator();
+ return reduceEmitter.getEmissions();
});
});
if (mapReduce.getReduceKeySort().isPresent())
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
index 6cd8885..7141259 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import scala.Tuple2;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
/**
@@ -29,14 +30,16 @@ import java.util.List;
*/
public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
- private final List<Tuple2<K, V>> emissions = new ArrayList<>();
+ private List<Tuple2<K, V>> emissions = new ArrayList<>();
@Override
public void emit(final K key, final V value) {
this.emissions.add(new Tuple2<>(key, value));
}
- public Iterable<Tuple2<K, V>> getEmissions() {
- return this.emissions;
+ public Iterator<Tuple2<K, V>> getEmissions() {
+ final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
+ this.emissions = new ArrayList<>();
+ return iterator;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
new file mode 100644
index 0000000..bccbc35
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -0,0 +1,65 @@
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkMessenger<M> implements Messenger<M> {
+
+ private Vertex vertex;
+ private Iterable<M> incomingMessages = new ArrayList<>();
+ private final List<Tuple2<Object,M>> outgoingMessages = new ArrayList<>();
+
+ public void setVertexAndMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+ this.vertex = vertex;
+ this.incomingMessages = incomingMessages;
+ this.outgoingMessages.clear();
+ }
+
+ public List<Tuple2<Object,M>> getOutgoingMessages() {
+ return this.outgoingMessages;
+ }
+
+
+ @Override
+ public Iterable<M> receiveMessages(final MessageScope messageScope) {
+ return this.incomingMessages;
+ }
+
+ @Override
+ public void sendMessage(final MessageScope messageScope, final M message) {
+ if (messageScope instanceof MessageScope.Local) {
+ final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+ final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+ final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
+ incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
+ } else {
+ ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
+ }
+ }
+
+ ///////////
+
+ private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+ incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+ return (T) incidentTraversal;
+ }
+
+ private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+ final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+ return step.getDirection().opposite();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
index 77e7072..a5d0175 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import scala.Tuple2;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
/**
@@ -29,14 +30,16 @@ import java.util.List;
*/
public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
- private final List<Tuple2<OK, OV>> emissions = new ArrayList<>();
+ private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
@Override
public void emit(final OK key, final OV value) {
this.emissions.add(new Tuple2<>(key, value));
}
- public List<Tuple2<OK, OV>> getEmissions() {
- return this.emissions;
+ public Iterator<Tuple2<OK, OV>> getEmissions() {
+ final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
+ this.emissions = new ArrayList<>();
+ return iterator;
}
}