You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/27 21:03:08 UTC

incubator-tinkerpop git commit: have a new experimental algorithm for SparkGraphComputer -- doesn't work perfectly right now, but is saved in SparkExecutor for growth.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master d16fef7ae -> 3f073ae1b


have a new experimental algorithm for SparkGraphComputer -- doesn't work perfectly right now, but is saved in SparkExecutor for growth.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3f073ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3f073ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3f073ae1

Branch: refs/heads/master
Commit: 3f073ae1b65e981505a05d5851eeb52070d5b1d2
Parents: d16fef7
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Mar 27 14:03:04 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Mar 27 14:03:04 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java   | 77 +++++++++++++++++++-
 .../process/computer/spark/SparkMapEmitter.java |  9 ++-
 .../process/computer/spark/SparkMessenger.java  | 65 +++++++++++++++++
 .../computer/spark/SparkReduceEmitter.java      |  9 ++-
 4 files changed, 150 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 1fa98f1..ec1dd6f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
+import com.google.common.base.Optional;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,12 +36,20 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -93,16 +102,76 @@ public final class SparkExecutor {
         return verticesHoldingIncomingMessages;
     }
 
+    //////////////////////////////
+    /////DEMO ALGORITHM /////////
+    /////////////////////////////
+    /////////////////////////////
+
+    public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration2(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>,List<M>>> viewMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+        final JavaPairRDD<Object, Tuple2<VertexWritable, Optional<Tuple2<List<DetachedVertexProperty<Object>>,List<M>>>>> graphViewMessagesRDD = graphRDD.leftOuterJoin(viewMessageRDD);
+
+        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndMessagesRDD = graphViewMessagesRDD.mapPartitionsToPair(partitions -> {
+            final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+            workerVertexProgram.workerIterationStart(memory);
+            final SparkMessenger<M> messenger = new SparkMessenger<>();
+            final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+            final String[] elementComputeKeysArray = elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
+            return () -> IteratorUtils.map(partitions, graphViewMessages -> {
+                final Vertex vertex = graphViewMessages._2()._1().get();
+                final List<DetachedVertexProperty<Object>> view = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._1() : Collections.emptyList();
+                final List<M> incomingMessages = graphViewMessages._2()._2().isPresent() ? graphViewMessages._2()._2().get()._2() : Collections.emptyList();
+                view.forEach(property -> property.attach(vertex));
+                messenger.setVertexAndMessages(vertex, incomingMessages);
+                memory.setInTask(true);
+                workerVertexProgram.execute(vertex, messenger, memory);
+                memory.setInTask(false);
+                final List<DetachedVertexProperty<Object>> properties = IteratorUtils.list(IteratorUtils.<VertexProperty<Object>, DetachedVertexProperty<Object>>map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+                vertex.properties(elementComputeKeysArray).forEachRemaining(VertexProperty::remove);
+                if (!partitions.hasNext())
+                    workerVertexProgram.workerIterationEnd(memory);
+                return new Tuple2<>(vertex.id(), new Tuple2<>(properties, messenger.getOutgoingMessages()));
+            });
+        });
+
+        final JavaPairRDD<Object, List<DetachedVertexProperty<Object>>> newViewRDD = viewAndMessagesRDD.mapValues(Tuple2::_1);
+
+        final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
+        final JavaPairRDD<Object, List<M>> newMessagesRDD = viewAndMessagesRDD
+                .flatMapToPair(viewAndMessages -> () -> viewAndMessages._2()._2().iterator())
+                .mapValues(message -> {
+                    final List<M> list = new ArrayList<>(1);
+                    list.add(message);
+                    return list;
+                }).reduceByKey((messageA, messageB) -> {
+                    if (null != messageCombiner) {
+                        final M message = Stream.concat(messageA.stream(), messageB.stream()).reduce(messageCombiner::combine).get();
+                        messageA.clear();
+                        messageA.add(message);
+                        return messageA;
+                    } else {
+                        messageA.addAll(messageB);
+                        return messageA;
+                    }
+                });
+
+        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessages = newViewRDD.join(newMessagesRDD);
+
+        newViewMessages.foreachPartition(x -> {
+        }); // execute the view
+
+        return newViewMessages;
+    }
+
     public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
             workerMapReduce.workerStart(MapReduce.Stage.MAP);
+            final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
             return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
                 workerMapReduce.map(keyValue._2().getVertex(), mapEmitter);
                 if (!partitionIterator.hasNext())
                     workerMapReduce.workerEnd(MapReduce.Stage.MAP);
-                return mapEmitter.getEmissions().iterator();
+                return mapEmitter.getEmissions();
             });
         });
         if (mapReduce.getMapKeySort().isPresent())
@@ -116,12 +185,12 @@ public final class SparkExecutor {
         JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
             final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
             workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+            final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
             return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
-                final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
                 workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
                 if (!partitionIterator.hasNext())
                     workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-                return reduceEmitter.getEmissions().iterator();
+                return reduceEmitter.getEmissions();
             });
         });
         if (mapReduce.getReduceKeySort().isPresent())

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
index 6cd8885..7141259 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import scala.Tuple2;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -29,14 +30,16 @@ import java.util.List;
  */
 public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
 
-    private final List<Tuple2<K, V>> emissions = new ArrayList<>();
+    private List<Tuple2<K, V>> emissions = new ArrayList<>();
 
     @Override
     public void emit(final K key, final V value) {
         this.emissions.add(new Tuple2<>(key, value));
     }
 
-    public Iterable<Tuple2<K, V>> getEmissions() {
-        return this.emissions;
+    public Iterator<Tuple2<K, V>> getEmissions() {
+        final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
+        this.emissions = new ArrayList<>();
+        return iterator;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
new file mode 100644
index 0000000..bccbc35
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -0,0 +1,65 @@
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkMessenger<M> implements Messenger<M> {
+
+    private Vertex vertex;
+    private  Iterable<M> incomingMessages = new ArrayList<>();
+    private final List<Tuple2<Object,M>> outgoingMessages = new ArrayList<>();
+
+    public void setVertexAndMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+        this.vertex = vertex;
+        this.incomingMessages = incomingMessages;
+        this.outgoingMessages.clear();
+    }
+
+    public List<Tuple2<Object,M>> getOutgoingMessages() {
+        return this.outgoingMessages;
+    }
+
+
+    @Override
+    public Iterable<M> receiveMessages(final MessageScope messageScope) {
+        return this.incomingMessages;
+    }
+
+    @Override
+    public void sendMessage(final MessageScope messageScope, final M message) {
+        if (messageScope instanceof MessageScope.Local) {
+            final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+            final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
+            incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
+        } else {
+            ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
+        }
+    }
+
+    ///////////
+
+    private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+        incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+        return (T) incidentTraversal;
+    }
+
+    private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+        final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+        return step.getDirection().opposite();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3f073ae1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
index 77e7072..a5d0175 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import scala.Tuple2;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -29,14 +30,16 @@ import java.util.List;
  */
 public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
 
-    private final List<Tuple2<OK, OV>> emissions = new ArrayList<>();
+    private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
 
     @Override
     public void emit(final OK key, final OV value) {
         this.emissions.add(new Tuple2<>(key, value));
     }
 
-    public List<Tuple2<OK, OV>> getEmissions() {
-        return this.emissions;
+    public Iterator<Tuple2<OK, OV>> getEmissions() {
+        final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
+        this.emissions = new ArrayList<>();
+        return iterator;
     }
 }