You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/10 20:36:10 UTC

[4/6] incubator-tinkerpop git commit: It is no longer up to the VertexProgram to remove() messages from the Messenger. If the graph provider can support iterator remove(), it does it automatically. I added a IteratorUtils.removeOnNext() wrapper to easily

It is no longer up to the VertexProgram to remove() messages from the Messenger. If the graph provider can support iterator remove(), it does it automatically. I added a IteratorUtils.removeOnNext() wrapper to easily aid the provider. If the providers iterator doesn't support remove(), thats that. This is less error prone as VertexPrograms don't have to be so aware of memory management at the underlying engine level. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b2cfc38c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b2cfc38c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b2cfc38c

Branch: refs/heads/TINKERPOP-971
Commit: b2cfc38c749765eed9bdfd34a84c666ca042d8b4
Parents: 51140a4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Feb 10 12:34:56 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Feb 10 12:34:56 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphComputation.java     |  3 +--
 .../computer/traversal/TraverserExecutor.java   |  1 -
 .../gremlin/util/iterator/IteratorUtils.java    | 21 ++++++++++++++++++++
 .../spark/process/computer/SparkMessenger.java  |  3 ++-
 .../process/computer/TinkerMessenger.java       |  8 ++++----
 5 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b2cfc38c/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
index f094f2d..1d52566 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.IOException;
 
@@ -42,7 +41,7 @@ public final class GiraphComputation extends BasicComputation<ObjectWritable, Ve
     public void compute(final Vertex<ObjectWritable, VertexWritable, NullWritable> vertex, final Iterable<ObjectWritable> messages) throws IOException {
         final GiraphWorkerContext workerContext = this.getWorkerContext();
         final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
-        vertexProgram.execute(ComputerGraph.vertexProgram(vertex.getValue().get(), vertexProgram), workerContext.getMessenger((GiraphVertex) vertex, this, IteratorUtils.noRemove(messages.iterator())), workerContext.getMemory());
+        vertexProgram.execute(ComputerGraph.vertexProgram(vertex.getValue().get(), vertexProgram), workerContext.getMessenger((GiraphVertex) vertex, this, messages.iterator()), workerContext.getMemory());
         workerContext.getVertexProgramPool().offer(vertexProgram);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b2cfc38c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 04f33f2..078e880 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -51,7 +51,6 @@ public final class TraverserExecutor {
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
         while (messages.hasNext()) {
             final Iterator<Traverser.Admin<Object>> traversers = (Iterator) messages.next().iterator();
-            messages.remove();
             while (traversers.hasNext()) {
                 final Traverser.Admin<Object> traverser = traversers.next();
                 traversers.remove();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b2cfc38c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index 78ffe4a..866cf9f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -411,4 +411,25 @@ public final class IteratorUtils {
             }
         };
     }
+
+    public static <T> Iterator<T> removeOnNext(final Iterator<T> iterator) {
+        return new Iterator<T>() {
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public void remove() {
+                iterator.remove();
+            }
+
+            @Override
+            public T next() {
+                final T object = iterator.next();
+                iterator.remove();
+                return object;
+            }
+        };
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b2cfc38c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
index f32c684..a3b11c8 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.structure.Direction;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.util.ArrayList;
@@ -54,7 +55,7 @@ public final class SparkMessenger<M> implements Messenger<M> {
 
     @Override
     public Iterator<M> receiveMessages() {
-        return this.incomingMessages.iterator();
+        return IteratorUtils.removeOnNext(this.incomingMessages.iterator());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b2cfc38c/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
index 71d7030..3298aff 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
@@ -65,19 +65,19 @@ public final class TinkerMessenger<M> implements Messenger<M> {
                 final Traversal.Admin<Vertex, Edge> incidentTraversal = TinkerMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex);
                 final Direction direction = TinkerMessenger.getDirection(incidentTraversal);
                 final Edge[] edge = new Edge[1]; // simulates storage side-effects available in Gremlin, but not Java8 streams
-                multiIterator.addIterator(IteratorUtils.noRemove(StreamSupport.stream(Spliterators.spliteratorUnknownSize(VertexProgramHelper.reverse(incidentTraversal.asAdmin()), Spliterator.IMMUTABLE | Spliterator.SIZED), false)
+                multiIterator.addIterator(StreamSupport.stream(Spliterators.spliteratorUnknownSize(VertexProgramHelper.reverse(incidentTraversal.asAdmin()), Spliterator.IMMUTABLE | Spliterator.SIZED), false)
                         .map(e -> this.messageBoard.receiveMessages.get((edge[0] = e).vertices(direction).next()))
                         .filter(q -> null != q)
                         .flatMap(Queue::stream)
                         .map(message -> localMessageScope.getEdgeFunction().apply(message, edge[0]))
-                        .iterator()));
+                        .iterator());
 
             } else {
-                multiIterator.addIterator(IteratorUtils.noRemove(Stream.of(this.vertex)
+                multiIterator.addIterator(Stream.of(this.vertex)
                         .map(this.messageBoard.receiveMessages::get)
                         .filter(q -> null != q)
                         .flatMap(Queue::stream)
-                        .iterator()));
+                        .iterator());
             }
         }
         return multiIterator;