You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/05 23:30:48 UTC
incubator-tinkerpop git commit: Figured out a neat trick.
IteratorUtils.noRemove(Iterator) so if a providers Messenger doesn't support
Iterator.remove(),
they can simply wrap the iterator they provide in Iterator.noRemove().
Chillin'.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1131 b54ddf283 -> 047ee2065
Figured out a neat trick. IteratorUtils.noRemove(Iterator) so if a providers Messenger doesn't support Iterator.remove(), they can simply wrap the iterator they provide in Iterator.noRemove(). Chillin'.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/047ee206
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/047ee206
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/047ee206
Branch: refs/heads/TINKERPOP-1131
Commit: 047ee2065023227381e1d2e594354b05331f06fd
Parents: b54ddf2
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 5 15:30:53 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 5 15:30:53 2016 -0700
----------------------------------------------------------------------
.../process/computer/GiraphComputation.java | 3 ++-
.../gremlin/util/iterator/IteratorUtils.java | 19 +++++++++++++++++++
.../gremlin/util/iterator/MultiIterator.java | 2 +-
.../process/computer/TinkerMessenger.java | 13 +++++++------
4 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/047ee206/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
index 1d52566..f094f2d 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphComputation.java
@@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.IOException;
@@ -41,7 +42,7 @@ public final class GiraphComputation extends BasicComputation<ObjectWritable, Ve
public void compute(final Vertex<ObjectWritable, VertexWritable, NullWritable> vertex, final Iterable<ObjectWritable> messages) throws IOException {
final GiraphWorkerContext workerContext = this.getWorkerContext();
final VertexProgram<?> vertexProgram = workerContext.getVertexProgramPool().take();
- vertexProgram.execute(ComputerGraph.vertexProgram(vertex.getValue().get(), vertexProgram), workerContext.getMessenger((GiraphVertex) vertex, this, messages.iterator()), workerContext.getMemory());
+ vertexProgram.execute(ComputerGraph.vertexProgram(vertex.getValue().get(), vertexProgram), workerContext.getMessenger((GiraphVertex) vertex, this, IteratorUtils.noRemove(messages.iterator())), workerContext.getMemory());
workerContext.getVertexProgramPool().offer(vertexProgram);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/047ee206/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index 808317f..78ffe4a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -392,4 +392,23 @@ public final class IteratorUtils {
public static <T> Stream<T> stream(final Iterable<T> iterable) {
return IteratorUtils.stream(iterable.iterator());
}
+
+ public static <T> Iterator<T> noRemove(final Iterator<T> iterator) {
+ return new Iterator<T>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public void remove() {
+ // do nothing
+ }
+
+ @Override
+ public T next() {
+ return iterator.next();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/047ee206/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
index e272a08..20c0946 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/MultiIterator.java
@@ -59,7 +59,7 @@ public final class MultiIterator<T> implements Iterator<T>, Serializable {
@Override
public void remove() {
- // this.iterators.get(this.current).remove();
+ this.iterators.get(this.current).remove();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/047ee206/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
index d0c302a..71d7030 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMessenger.java
@@ -29,6 +29,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.tinkerpop.gremlin.util.iterator.MultiIterator;
import java.util.Iterator;
@@ -64,19 +65,19 @@ public final class TinkerMessenger<M> implements Messenger<M> {
final Traversal.Admin<Vertex, Edge> incidentTraversal = TinkerMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get().asAdmin(), this.vertex);
final Direction direction = TinkerMessenger.getDirection(incidentTraversal);
final Edge[] edge = new Edge[1]; // simulates storage side-effects available in Gremlin, but not Java8 streams
- multiIterator.addIterator(StreamSupport.stream(Spliterators.spliteratorUnknownSize(VertexProgramHelper.reverse(incidentTraversal.asAdmin()), Spliterator.IMMUTABLE | Spliterator.SIZED), false)
+ multiIterator.addIterator(IteratorUtils.noRemove(StreamSupport.stream(Spliterators.spliteratorUnknownSize(VertexProgramHelper.reverse(incidentTraversal.asAdmin()), Spliterator.IMMUTABLE | Spliterator.SIZED), false)
.map(e -> this.messageBoard.receiveMessages.get((edge[0] = e).vertices(direction).next()))
.filter(q -> null != q)
.flatMap(Queue::stream)
.map(message -> localMessageScope.getEdgeFunction().apply(message, edge[0]))
- .iterator());
+ .iterator()));
} else {
- multiIterator.addIterator(Stream.of(this.vertex)
+ multiIterator.addIterator(IteratorUtils.noRemove(Stream.of(this.vertex)
.map(this.messageBoard.receiveMessages::get)
.filter(q -> null != q)
.flatMap(Queue::stream)
- .iterator());
+ .iterator()));
}
}
return multiIterator;
@@ -93,8 +94,8 @@ public final class TinkerMessenger<M> implements Messenger<M> {
}
private void addMessage(final Vertex vertex, final M message) {
- this.messageBoard.sendMessages.compute(vertex, (v,queue) -> {
- if(null == queue) queue = new ConcurrentLinkedQueue<>();
+ this.messageBoard.sendMessages.compute(vertex, (v, queue) -> {
+ if (null == queue) queue = new ConcurrentLinkedQueue<>();
queue.add(null != this.combiner && !queue.isEmpty() ? this.combiner.combine(queue.remove(), message) : message);
return queue;
});