You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/09 14:54:31 UTC

[01/18] tinkerpop git commit: Made is so MemoryComputeKey implements Cloneable. This is actually really important we have NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting lucky' in that Spark and Giraph use Se

Repository: tinkerpop
Updated Branches:
  refs/heads/master ebc0391dd -> b2cde4eba


Made is so MemoryComputeKey implements Cloneable. This is actually really important we have NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting lucky' in that Spark and Giraph use Serialization and thus we get a clone for free. However, for parallelization within a JVM, we woulld have issues except we never realized because we had a single global Memory for TinkerGraph. Now we don't and clone()ing bi operators works.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8deca706
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8deca706
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8deca706

Branch: refs/heads/master
Commit: 8deca70680e8c89b31fea1cc99300740f45eec56
Parents: 1ac003d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 04:55:12 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../process/computer/MemoryComputeKey.java      | 24 ++++++++++++++++++--
 .../process/traversal/step/map/GroupStep.java   | 18 +++++++++++++--
 .../traversal/step/map/OrderGlobalStep.java     | 13 ++++++++++-
 .../util/function/ChainedComparator.java        | 18 +++++++++++++--
 .../process/computer/TinkerWorkerMemory.java    |  9 +-------
 5 files changed, 67 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
index 94ca675..70adf3d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
@@ -22,6 +22,8 @@ package org.apache.tinkerpop.gremlin.process.computer;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.function.BinaryOperator;
 
 /**
@@ -32,10 +34,10 @@ import java.util.function.BinaryOperator;
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryComputeKey<A> implements Serializable {
+public final class MemoryComputeKey<A> implements Serializable, Cloneable {
 
     private final String key;
-    private final BinaryOperator<A> reducer;
+    private BinaryOperator<A> reducer;
     private final boolean isTransient;
     private final boolean isBroadcast;
 
@@ -73,7 +75,25 @@ public final class MemoryComputeKey<A> implements Serializable {
         return object instanceof MemoryComputeKey && ((MemoryComputeKey) object).key.equals(this.key);
     }
 
+    @Override
+    public MemoryComputeKey<A> clone() {
+        try {
+            final MemoryComputeKey<A> clone = (MemoryComputeKey<A>) super.clone();
+            for (final Method method : this.reducer.getClass().getMethods()) {
+                if (method.getName().equals("clone") && 0 == method.getParameterCount()) {
+                    clone.reducer = (BinaryOperator<A>) method.invoke(this.reducer);
+                    break;
+                }
+            }
+            return clone;
+        } catch (final IllegalAccessException | InvocationTargetException | CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     public static <A> MemoryComputeKey<A> of(final String key, final BinaryOperator<A> reducer, final boolean isBroadcast, final boolean isTransient) {
         return new MemoryComputeKey<>(key, reducer, isBroadcast, isTransient);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de4e223..d6ce421 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -95,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
             final TraverserSet traverserSet = new TraverserSet<>();
             this.preTraversal.reset();
             this.preTraversal.addStart(traverser);
-            while(this.preTraversal.hasNext()) {
+            while (this.preTraversal.hasNext()) {
                 traverserSet.add(this.preTraversal.nextTraverser());
             }
             map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
@@ -158,7 +158,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
 
     ///////////////////////
 
-    public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
+    public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable, Cloneable {
 
         // size limit before Barrier.processAllStarts() to lazy reduce
         private static final int SIZE_LIMIT = 1000;
@@ -182,6 +182,20 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         }
 
         @Override
+        public GroupBiOperator<K, V> clone() {
+            try {
+                final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone();
+                if (null != this.valueTraversal) {
+                    clone.valueTraversal = this.valueTraversal.clone();
+                    clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal).orElse(null);
+                }
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) {
             for (final K key : mapB.keySet()) {
                 Object objectA = mapA.get(key);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index 60be2d6..a7d21b2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -144,7 +144,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
 
     ////////////////
 
-    public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable {
+    public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable, Cloneable {
 
         private ChainedComparator chainedComparator;
         private long limit;
@@ -159,6 +159,17 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
         }
 
         @Override
+        public OrderBiOperator<S> clone() {
+            try {
+                final OrderBiOperator<S> clone = (OrderBiOperator<S>) super.clone();
+                clone.chainedComparator = this.chainedComparator.clone();
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) {
             setA.addAll(setB);
             if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
index 44a994b..bdb2e6d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
@@ -34,9 +34,9 @@ import java.util.stream.Collectors;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>, Serializable {
+public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>, Serializable, Cloneable {
 
-    private final List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>();
+    private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>();
     private final boolean isShuffle;
     private final boolean traversers;
 
@@ -66,4 +66,18 @@ public final class ChainedComparator<S, C extends Comparable> implements Compara
         }
         return 0;
     }
+
+    @Override
+    public ChainedComparator<S, C> clone() {
+        try {
+            final ChainedComparator<S, C> clone = (ChainedComparator<S, C>) super.clone();
+            clone.comparators = new ArrayList<>();
+            for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator : this.comparators) {
+                clone.comparators.add(new Pair<>(comparator.getValue0().clone(), comparator.getValue1()));
+            }
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 081e4fa..1afa27e 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,9 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.util.Serializer;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -41,12 +39,7 @@ public final class TinkerWorkerMemory implements Memory.Admin {
     public TinkerWorkerMemory(final TinkerMemory mainMemory) {
         this.mainMemory = mainMemory;
         for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
-            try {
-                final MemoryComputeKey clone = Serializer.cloneObject(key);
-                this.reducers.put(clone.getKey(), clone.getReducer());
-            } catch (final IOException | ClassNotFoundException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+            this.reducers.put(key.getKey(), key.clone().getReducer());
         }
     }
 


[02/18] tinkerpop git commit: Added Serializer.cloneObject() which clones via serialization (helper method). TinkerGraphComputer now how a sound distributed Memory system where each worker/thread aggregates without concurrency locally and then, at the en

Posted by ok...@apache.org.
Added Serializer.cloneObject() which clones via serialization (helper method). TinkerGraphComputer now how a sound distributed Memory system where each worker/thread aggregates without concurrency locally and then, at the end of the iteration, the thread-distributed memories are aggregated into the main TinkerMemory.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1ac003d0
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1ac003d0
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1ac003d0

Branch: refs/heads/master
Commit: 1ac003d00807c1351594d04a4bbdb55e93b00134
Parents: 056d7ae
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 04:06:30 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../apache/tinkerpop/gremlin/util/Serializer.java    |  4 ++++
 .../process/computer/TinkerGraphComputer.java        |  8 +++-----
 .../process/computer/TinkerWorkerMemory.java         |  6 +++---
 .../process/computer/TinkerWorkerPool.java           | 15 ++++++++++++---
 4 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
index 28bab16..988fce3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java
@@ -47,4 +47,8 @@ public final class Serializer {
         in.close();
         return object;
     }
+
+    public static <V> V cloneObject(final V object) throws IOException, ClassNotFoundException {
+        return (V) Serializer.deserializeObject(Serializer.serializeObject(object));
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index fef2e1a..7523d63 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -157,7 +157,7 @@ public final class TinkerGraphComputer implements GraphComputer {
         return computerService.submit(() -> {
             final long time = System.currentTimeMillis();
             final TinkerGraphComputerView view;
-            final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers);
+            final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
             try {
                 if (null != this.vertexProgram) {
                     view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
@@ -168,8 +168,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                         this.memory.completeSubRound();
                         workers.setVertexProgram(this.vertexProgram);
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
-                        workers.executeVertexProgram(vertexProgram -> {
-                            final TinkerWorkerMemory workerMemory = new TinkerWorkerMemory(this.memory);
+                        workers.executeVertexProgram((vertexProgram, workerMemory) -> {
                             vertexProgram.workerIterationStart(workerMemory.asImmutable());
                             while (true) {
                                 final Vertex vertex = vertices.next();
@@ -178,8 +177,7 @@ public final class TinkerGraphComputer implements GraphComputer {
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
-                                        workerMemory
-                                );
+                                        workerMemory);
                             }
                             vertexProgram.workerIterationEnd(workerMemory.asImmutable());
                             workerMemory.complete();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 28b99e3..081e4fa 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.util.Serializer;
 
 import java.io.IOException;
@@ -43,10 +42,10 @@ public final class TinkerWorkerMemory implements Memory.Admin {
         this.mainMemory = mainMemory;
         for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
             try {
-                final MemoryComputeKey clone = (MemoryComputeKey) Serializer.deserializeObject(Serializer.serializeObject(key));
+                final MemoryComputeKey clone = Serializer.cloneObject(key);
                 this.reducers.put(clone.getKey(), clone.getReducer());
             } catch (final IOException | ClassNotFoundException e) {
-                this.reducers.put(key.getKey(), key.getReducer()); // super ghetto
+                throw new IllegalStateException(e.getMessage(), e);
             }
         }
     }
@@ -112,5 +111,6 @@ public final class TinkerWorkerMemory implements Memory.Admin {
         for (final Map.Entry<String, Object> entry : this.workerMemory.entrySet()) {
             this.mainMemory.add(entry.getKey(), entry.getValue());
         }
+        this.workerMemory.clear();
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 3d851bf..140d347 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -24,10 +24,13 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
 
+import java.util.Queue;
 import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 /**
@@ -44,11 +47,15 @@ public final class TinkerWorkerPool implements AutoCloseable {
 
     private VertexProgramPool vertexProgramPool;
     private MapReducePool mapReducePool;
+    private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
 
-    public TinkerWorkerPool(final int numberOfWorkers) {
+    public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
         this.numberOfWorkers = numberOfWorkers;
         this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
         this.completionService = new ExecutorCompletionService<>(this.workerPool);
+        for (int i = 0; i < this.numberOfWorkers; i++) {
+            this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+        }
     }
 
     public void setVertexProgram(final VertexProgram vertexProgram) {
@@ -59,12 +66,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException {
+    public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
-                worker.accept(vp);
+                final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
+                worker.accept(vp, workerMemory);
                 this.vertexProgramPool.offer(vp);
+                this.workerMemoryPool.offer(workerMemory);
                 return null;
             });
         }


[17/18] tinkerpop git commit: Merge branch 'TINKERPOP-1585' into tp32

Posted by ok...@apache.org.
Merge branch 'TINKERPOP-1585' into tp32


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2d824cf2
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2d824cf2
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2d824cf2

Branch: refs/heads/master
Commit: 2d824cf29f7d914405e262f4111aa2f5a7c272dc
Parents: c3e6ed9 3fd74fc
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jan 9 07:54:03 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 9 07:54:03 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   5 +
 .../process/computer/MemoryComputeKey.java      |  24 +++-
 .../traversal/TraversalVertexProgram.java       |  20 +++-
 .../computer/traversal/WorkerExecutor.java      |   2 +-
 .../traversal/step/branch/RepeatStep.java       |  17 ++-
 .../traversal/step/filter/DedupGlobalStep.java  |  42 ++++---
 .../process/traversal/step/map/GroupStep.java   |  21 +++-
 .../traversal/step/map/OrderGlobalStep.java     |  13 ++-
 .../optimization/RepeatUnrollStrategy.java      |   6 +-
 .../tinkerpop/gremlin/util/Serializer.java      |   4 +
 .../util/function/ChainedComparator.java        |  18 ++-
 .../step/filter/GroovyDedupTest.groovy          |   5 +
 .../traversal/step/filter/DedupTest.java        |  20 +++-
 .../SparkInterceptorStrategyTest.java           |   2 +-
 .../process/computer/TinkerGraphComputer.java   |  22 ++--
 .../computer/TinkerGraphComputerView.java       |  32 +++---
 .../process/computer/TinkerMemory.java          |   2 +-
 .../process/computer/TinkerWorkerMemory.java    | 109 +++++++++++++++++++
 .../process/computer/TinkerWorkerPool.java      |  43 +++++++-
 19 files changed, 340 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2d824cf2/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --cc CHANGELOG.asciidoc
index c090940,d7f4256..805db7b
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@@ -26,10 -26,13 +26,15 @@@ image::https://raw.githubusercontent.co
  TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  
 +* Fixed a bug in `LazyBarrierStrategy` where `profile()` was deactivating it accidentally.
+ * Fixed a bug in `RepeatUnrollStrategy` where stateful `DedupGlobalStep` was cloned and thus, maintained two deduplication sets.
  * Added documentation around "terminal steps" in Gremlin: `hasNext()`, `next()`, `toList()`, etc.
 +* Added `CloseableIterator` to allow `Graph` providers who open expensive resources a way to let users release them.
  * Fixed minor bug in `gremlin-driver` where closing a session-based `Client` without initializing it could generate an error.
+ * Relieved synchronization pressure in various areas of `TinkerGraphComputer`.
+ * Fixed an optimization bug in OLAP-based `DedupGlobalStep` where deduping occurred twice.
+ * `MemoryComputeKey` now implements `Cloneable` which is useful for `BiOperator` reducers that maintain thread-unsafe state.
+ * `TinkerGraphComputer` now supports distributed `Memory` with lock-free partition aggregation.
  * `TinkerGraph` Gryo and GraphSON deserialization is now configured to use multi-properties.
  * Changed behavior of `ElementHelper.areEqual(Property, Property)` to not throw exceptions with `null` arguments.
  * Added `GryoVersion` for future flexibility when introducing a new verison of Gryo and moved serializer registrations to it.


[07/18] tinkerpop git commit: discovering various synchronization bottlenecks in TinkerGraphComputer. Also, realized some dumb things I was doing in TraversalVertexProgram. Its crazy, for this benchmark that @dkuppitz and i have, if I don't touch vertex.

Posted by ok...@apache.org.
discovering various synchronization bottlenecks in TinkerGraphComputer. Also, realized some dumb things I was doing in TraversalVertexProgram. Its crazy, for this benchmark that @dkuppitz and i have, if I don't touch vertex.properties that are compute keys: millisecond return times. If I do, seconds return times.... Need to figure out how to partition TinkerGraphView... Perhaps thread local..dah.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8d961285
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8d961285
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8d961285

Branch: refs/heads/master
Commit: 8d961285f687107ffc5c1d8a3bca7c78ea833adf
Parents: cef1979
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 10:32:21 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:05 2017 -0700

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       | 16 +++++++---
 .../computer/traversal/WorkerExecutor.java      |  2 +-
 .../computer/TinkerGraphComputerView.java       | 32 +++++++++-----------
 .../groovy/TinkerGraphGroovyPlayTest.groovy     |  2 +-
 4 files changed, 27 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8d961285/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 1a54721..b82e265 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -32,7 +32,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
 import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger;
@@ -51,6 +50,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.ProfileSideEffectStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ComputerVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversal;
@@ -251,8 +251,14 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 ((Barrier) this.traversalMatrix.getStepById(stepId)).done();
         }
         // define halted traversers
-        final TraverserSet<Object> haltedTraversers = vertex.<TraverserSet<Object>>property(HALTED_TRAVERSERS).orElse(new TraverserSet<>());
-        vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
+        final VertexProperty<TraverserSet<Object>> property = vertex.property(HALTED_TRAVERSERS);
+        final TraverserSet<Object> haltedTraversers;
+        if (property.isPresent()) {
+            haltedTraversers = property.value();
+        } else {
+            haltedTraversers = new TraverserSet<>();
+            vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
+        }
         //////////////////
         if (memory.isInitialIteration()) {    // ITERATION 1
             final TraverserSet<Object> activeTraversers = new TraverserSet<>();
@@ -282,9 +288,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserStrategy));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, haltedTraversers, this.haltedTraverserStrategy));
         } else   // ITERATION 1+
-            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserStrategy));
+            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers, haltedTraversers, this.haltedTraverserStrategy));
         // save space by not having an empty halted traversers property
         if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
             vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8d961285/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index 2571e7b..e6e73d0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -59,10 +59,10 @@ final class WorkerExecutor {
                                      final TraversalMatrix<?, ?> traversalMatrix,
                                      final Memory memory,
                                      final boolean returnHaltedTraversers,
+                                     final TraverserSet<Object> haltedTraversers,
                                      final HaltedTraverserStrategy haltedTraverserStrategy) {
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
-        final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
         final TraverserSet<Object> activeTraversers = new TraverserSet<>();
         final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8d961285/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
index 43090fe..43998fb 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputerView.java
@@ -43,8 +43,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -104,11 +102,14 @@ public final class TinkerGraphComputerView {
     }
 
     public List<Property> getProperties(final TinkerVertex vertex) {
-        final Stream<Property> a = TinkerHelper.getProperties(vertex).values().stream().flatMap(list -> list.stream());
-        final Stream<Property> b = this.computeProperties.containsKey(vertex) ?
-                this.computeProperties.get(vertex).values().stream().flatMap(list -> list.stream()) :
-                Stream.empty();
-        return Stream.concat(a, b).collect(Collectors.toList());
+        final List<Property> list = new ArrayList<>();
+        for (final List<VertexProperty> properties : TinkerHelper.getProperties(vertex).values()) {
+            list.addAll(properties);
+        }
+        for (final List<VertexProperty<?>> properties : this.computeProperties.getOrDefault(vertex, Collections.emptyMap()).values()) {
+            list.addAll(properties);
+        }
+        return list;
     }
 
     public void removeProperty(final TinkerVertex vertex, final String key, final VertexProperty property) {
@@ -131,8 +132,9 @@ public final class TinkerGraphComputerView {
         // remove all transient properties from the vertices
         for (final VertexComputeKey computeKey : this.computeKeys.values()) {
             if (computeKey.isTransient()) {
-                final List<VertexProperty<?>> toRemove = this.computeProperties.values().stream().flatMap(map -> map.getOrDefault(computeKey.getKey(), Collections.emptyList()).stream()).collect(Collectors.toList());
-                toRemove.forEach(VertexProperty::remove);
+                for (final Map<String, List<VertexProperty<?>>> properties : this.computeProperties.values()) {
+                    properties.remove(computeKey.getKey());
+                }
             }
         }
     }
@@ -211,22 +213,16 @@ public final class TinkerGraphComputerView {
     }
 
     private void addValue(final Vertex vertex, final String key, final VertexProperty property) {
-        final Map<String, List<VertexProperty<?>>> elementProperties = this.computeProperties.computeIfAbsent(vertex, k -> new ConcurrentHashMap<>());
+        final Map<String, List<VertexProperty<?>>> elementProperties = this.computeProperties.computeIfAbsent(vertex, k -> new HashMap<>());
         elementProperties.compute(key, (k, v) -> {
-            if (null == v) v = Collections.synchronizedList(new ArrayList<>());
+            if (null == v) v = new ArrayList<>();
             v.add(property);
             return v;
         });
     }
 
     private void removeValue(final Vertex vertex, final String key, final VertexProperty property) {
-        this.computeProperties.computeIfPresent(vertex, (k, v) -> {
-            v.computeIfPresent(key, (k1, v1) -> {
-                v1.remove(property);
-                return v1;
-            });
-            return v;
-        });
+        this.computeProperties.<List<Map<String, VertexProperty<?>>>>getOrDefault(vertex, Collections.emptyMap()).get(key).remove(property);
     }
 
     private List<VertexProperty<?>> getValue(final Vertex vertex, final String key) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8d961285/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
index d277977..a736f3d 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
@@ -40,7 +40,7 @@ class TinkerGraphGroovyPlayTest {
         def a = graph.traversal().withComputer(Computer.compute())
         def r = new Random(123)
 
-        (1..1725403).each {
+        (1..1000000).each {
             def vid = ["a", "b", "c", "d"].collectEntries { [it, r.nextInt() % 400000] }
             graph.addVertex(T.id, vid)
         }; []


[15/18] tinkerpop git commit: removed the toy file and updated CHANGELOG.

Posted by ok...@apache.org.
removed the toy file and updated CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/dc38b435
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/dc38b435
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/dc38b435

Branch: refs/heads/master
Commit: dc38b435341d587c4d3c89bbc4b5261148077b0a
Parents: e925808
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 5 16:59:14 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  2 +-
 .../groovy/TinkerGraphGroovyPlayTest.groovy     | 53 --------------------
 2 files changed, 1 insertion(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/dc38b435/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 1fc5355..f8c9743 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -29,7 +29,7 @@ TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 * Added documentation around "terminal steps" in Gremlin: `hasNext()`, `next()`, `toList()`, etc.
 * Fixed minor bug in `gremlin-driver` where closing a session-based `Client` without initializing it could generate an error.
 * Relieved synchronization pressure in various areas of `TinkerGraphComputer`.
-* Fixed an optimization bug in `DedupGlobalStep` in OLAP where deduping occurred twice.
+* Fixed an optimization bug in OLAP-based `DedupGlobalStep` where deduping occurred twice.
 * `MemoryComputeKey` now implements `Cloneable` which is useful for `BiOperator` reducers that maintain thread-unsafe state.
 * `TinkerGraphComputer` now supports distributed `Memory` with lock-free partition aggregation.
 * `TinkerGraph` Gryo and GraphSON deserialization is now configured to use multi-properties.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/dc38b435/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
deleted file mode 100644
index a736f3d..0000000
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.tinkergraph.process.groovy
-
-import org.apache.tinkerpop.gremlin.process.computer.Computer
-import org.apache.tinkerpop.gremlin.structure.T
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
-import org.apache.tinkerpop.gremlin.util.TimeUtil
-import org.junit.Ignore
-import org.junit.Test
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-class TinkerGraphGroovyPlayTest {
-
-    @Test
-    @Ignore
-    public void testStuff() {
-
-        def graph = TinkerGraph.open()
-        def g = graph.traversal()
-        def a = graph.traversal().withComputer(Computer.compute())
-        def r = new Random(123)
-
-        (1..1000000).each {
-            def vid = ["a", "b", "c", "d"].collectEntries { [it, r.nextInt() % 400000] }
-            graph.addVertex(T.id, vid)
-        }; []
-
-        println TimeUtil.clockWithResult(1) { g.V().id().select("c").count().next() }
-        println TimeUtil.clockWithResult(1) { g.V().id().select("c").dedup().count().next() }
-        println TimeUtil.clockWithResult(1) { a.V().id().select("c").count().next() }
-        println TimeUtil.clockWithResult(1) { a.V().id().select("c").dedup().count().next() }
-    }
-}


[18/18] tinkerpop git commit: Merge branch 'tp32'

Posted by ok...@apache.org.
Merge branch 'tp32'


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2cde4eb
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2cde4eb
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2cde4eb

Branch: refs/heads/master
Commit: b2cde4eba853172248951e6f31bd9bb00f63587d
Parents: ebc0391 2d824cf
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jan 9 07:54:22 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 9 07:54:22 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |   5 +
 .../process/computer/MemoryComputeKey.java      |  24 +++-
 .../traversal/TraversalVertexProgram.java       |  20 +++-
 .../computer/traversal/WorkerExecutor.java      |   2 +-
 .../traversal/step/branch/RepeatStep.java       |  17 ++-
 .../traversal/step/filter/DedupGlobalStep.java  |  42 ++++---
 .../process/traversal/step/map/GroupStep.java   |  21 +++-
 .../traversal/step/map/OrderGlobalStep.java     |  13 ++-
 .../optimization/RepeatUnrollStrategy.java      |   6 +-
 .../tinkerpop/gremlin/util/Serializer.java      |   4 +
 .../util/function/ChainedComparator.java        |  18 ++-
 .../step/filter/GroovyDedupTest.groovy          |   5 +
 .../traversal/step/filter/DedupTest.java        |  20 +++-
 .../SparkInterceptorStrategyTest.java           |   2 +-
 .../process/computer/TinkerGraphComputer.java   |  22 ++--
 .../computer/TinkerGraphComputerView.java       |  32 +++---
 .../process/computer/TinkerMemory.java          |   2 +-
 .../process/computer/TinkerWorkerMemory.java    | 109 +++++++++++++++++++
 .../process/computer/TinkerWorkerPool.java      |  43 +++++++-
 19 files changed, 340 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2cde4eb/CHANGELOG.asciidoc
----------------------------------------------------------------------


[08/18] tinkerpop git commit: updated CHANGELOG. Going to move on. Need example Gryo dataset from @dkuppitz to verify performance improvement of DedupGlobalStep on SparkGraphComputer.

Posted by ok...@apache.org.
updated CHANGELOG. Going to move on. Need example Gryo dataset from @dkuppitz to verify performance improvement of DedupGlobalStep on SparkGraphComputer.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/cef19796
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/cef19796
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/cef19796

Branch: refs/heads/master
Commit: cef19796ffd2ad27074174ad7f90b9e1f24bc07e
Parents: 8deca70
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 05:05:09 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:05 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cef19796/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 35b1320..81dd94e 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,9 @@ TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 
 * Added documentation around "terminal steps" in Gremlin: `hasNext()`, `next()`, `toList()`, etc.
 * Fixed minor bug in `gremlin-driver` where closing a session-based `Client` without initializing it could generate an error.
+* Fixed an optimization bug in `DedupGlobalStep` in OLAP where deduping occurred twice.
+* `MemoryComputeKey` now implements `Cloneable` which is useful for `BiOperator` reducers that maintain thread-unsafe state.
+* `TinkerGraphComputer` now supports distributed `Memory` with lock-free partition aggregation.
 * `TinkerGraph` Gryo and GraphSON deserialization is now configured to use multi-properties.
 * Changed behavior of `ElementHelper.areEqual(Property, Property)` to not throw exceptions with `null` arguments.
 * Added `GryoVersion` for future flexibility when introducing a new verison of Gryo and moved serializer registrations to it.


[16/18] tinkerpop git commit: found a fixed a bug in RepeatUnrollStragegy. Global stateful steps like DedupGlobalStep can not be unrolled else you have split the global state amongst times(x) local steps. Added a test case to DedupGlobalStep g.V().repea

Posted by ok...@apache.org.
found a fixed a bug in RepeatUnrollStragegy. Global stateful steps like DedupGlobalStep can not be unrolled else you have split the global state amongst times(x) local steps.  Added a test case to DedupGlobalStep g.V().repeat(dedup()).times(2).


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3fd74fc2
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3fd74fc2
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3fd74fc2

Branch: refs/heads/master
Commit: 3fd74fc23cb2f1e25d555be01abd4d979967f239
Parents: dc38b43
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jan 6 13:02:31 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 6 13:02:31 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc                              |  1 +
 .../traversal/step/branch/RepeatStep.java       | 17 ++++++++++++++++-
 .../traversal/step/filter/DedupGlobalStep.java  |  3 +--
 .../optimization/RepeatUnrollStrategy.java      |  6 +++++-
 .../step/filter/GroovyDedupTest.groovy          |  5 +++++
 .../traversal/step/filter/DedupTest.java        | 20 +++++++++++++++++++-
 6 files changed, 47 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3fd74fc2/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index f8c9743..d7f4256 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
 TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
+* Fixed a bug in `RepeatUnrollStrategy` where stateful `DedupGlobalStep` was cloned and thus, maintained two deduplication sets.
 * Added documentation around "terminal steps" in Gremlin: `hasNext()`, `next()`, `toList()`, etc.
 * Fixed minor bug in `gremlin-driver` where closing a session-based `Client` without initializing it could generate an error.
 * Relieved synchronization pressure in various areas of `TinkerGraphComputer`.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3fd74fc2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
index ebdb657..0fc2cdd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/branch/RepeatStep.java
@@ -90,12 +90,16 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
         return this.emitTraversal;
     }
 
+    public Traversal.Admin<S, S> getRepeatTraversal() {
+        return this.repeatTraversal;
+    }
+
     public List<Traversal.Admin<S, S>> getGlobalChildren() {
         return null == this.repeatTraversal ? Collections.emptyList() : Collections.singletonList(this.repeatTraversal);
     }
 
     public List<Traversal.Admin<S, ?>> getLocalChildren() {
-        final List<Traversal.Admin<S, ?>> list = new ArrayList<>();
+        final List<Traversal.Admin<S, ?>> list = new ArrayList<>(2);
         if (null != this.untilTraversal)
             list.add(this.untilTraversal);
         if (null != this.emitTraversal)
@@ -123,6 +127,17 @@ public final class RepeatStep<S> extends ComputerAwareStep<S, S> implements Trav
             return StringFactory.stepString(this, this.repeatTraversal, untilString(), emitString());
     }
 
+    @Override
+    public void reset() {
+        super.reset();
+        if (null != this.emitTraversal)
+            this.emitTraversal.reset();
+        if (null != this.untilTraversal)
+            this.untilTraversal.reset();
+        if (null != this.repeatTraversal)
+            this.repeatTraversal.reset();
+    }
+
     private final String untilString() {
         return null == this.untilTraversal ? "until(false)" : "until(" + this.untilTraversal + ')';
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3fd74fc2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index d024456..96bd0be 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -77,7 +77,6 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
             this.dedupLabels.forEach(label -> objects.add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal)));
             return this.duplicateSet.add(objects);
         }
-
     }
 
     @Override
@@ -128,7 +127,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     @Override
     public void setTraversal(final Traversal.Admin<?, ?> parentTraversal) {
         super.setTraversal(parentTraversal);
-        integrateChild(this.dedupTraversal);
+        this.integrateChild(this.dedupTraversal);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3fd74fc2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/RepeatUnrollStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/RepeatUnrollStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/RepeatUnrollStrategy.java
index 31eb0d2..0a7cd4e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/RepeatUnrollStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/RepeatUnrollStrategy.java
@@ -19,11 +19,13 @@
 
 package org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization;
 
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.lambda.LoopTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.branch.RepeatStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.DedupGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.NoOpBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -48,7 +50,9 @@ public final class RepeatUnrollStrategy extends AbstractTraversalStrategy<Traver
         for (int i = 0; i < traversal.getSteps().size(); i++) {
             if (traversal.getSteps().get(i) instanceof RepeatStep) {
                 final RepeatStep<?> repeatStep = (RepeatStep) traversal.getSteps().get(i);
-                if (null == repeatStep.getEmitTraversal() && repeatStep.getUntilTraversal() instanceof LoopTraversal && ((LoopTraversal) repeatStep.getUntilTraversal()).getMaxLoops() > 0) {
+                if (null == repeatStep.getEmitTraversal() &&
+                        repeatStep.getUntilTraversal() instanceof LoopTraversal && ((LoopTraversal) repeatStep.getUntilTraversal()).getMaxLoops() > 0 &&
+                        !TraversalHelper.hasStepOfAssignableClassRecursively(Scope.global, DedupGlobalStep.class, repeatStep.getRepeatTraversal())) {
                     final Traversal.Admin<?, ?> repeatTraversal = repeatStep.getGlobalChildren().get(0);
                     repeatTraversal.removeStep(repeatTraversal.getSteps().size() - 1); // removes the RepeatEndStep
                     TraversalHelper.applySingleLevelStrategies(traversal, repeatTraversal, RepeatUnrollStrategy.class);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3fd74fc2/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/filter/GroovyDedupTest.groovy
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/filter/GroovyDedupTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/filter/GroovyDedupTest.groovy
index 83428c2..8f5e928 100644
--- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/filter/GroovyDedupTest.groovy
+++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/filter/GroovyDedupTest.groovy
@@ -104,5 +104,10 @@ public abstract class GroovyDedupTest {
         public Traversal<Vertex, Collection<Vertex>> get_g_V_asXaX_repeatXbothX_timesX3X_emit_asXbX_group_byXselectXaXX_byXselectXbX_dedup_order_byXidX_foldX_selectXvaluesX_unfold_dedup() {
             new ScriptTraversal<>(g, "gremlin-groovy", "g.V.as('a').repeat(both()).times(3).emit.as('b').group.by(select('a')).by(select('b').dedup.order.by(id).fold).select(values).unfold.dedup")
         }
+
+        @Override
+        public Traversal<Vertex, Long> get_g_V_repeatXdedupX_timesX2X_count() {
+            new ScriptTraversal<>(g, "gremlin-groovy", "g.V.repeat(dedup()).times(2).count")
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3fd74fc2/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupTest.java
index 19e685a..183a3a9 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupTest.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.both;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.bothE;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.dedup;
 import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.select;
 import static org.apache.tinkerpop.gremlin.structure.Column.values;
 import static org.junit.Assert.assertEquals;
@@ -87,6 +88,8 @@ public abstract class DedupTest extends AbstractGremlinProcessTest {
 
     public abstract Traversal<Vertex, Collection<Vertex>> get_g_V_asXaX_repeatXbothX_timesX3X_emit_asXbX_group_byXselectXaXX_byXselectXbX_dedup_order_byXidX_foldX_selectXvaluesX_unfold_dedup();
 
+    public abstract Traversal<Vertex, Long> get_g_V_repeatXdedupX_timesX2X_count();
+
     @Test
     @LoadGraphWith(MODERN)
     public void g_V_out_in_valuesXnameX_fold_dedupXlocalX_unfold() {
@@ -284,7 +287,7 @@ public abstract class DedupTest extends AbstractGremlinProcessTest {
     }
 
     @Test
-    @LoadGraphWith
+    @LoadGraphWith(MODERN)
     public void g_V_asXaX_repeatXbothX_timesX3X_emit_asXbX_group_byXselectXaXX_byXselectXbX_dedup_order_byXidX_foldX_selectXvaluesX_unfold_dedup() {
         final Traversal<Vertex, Collection<Vertex>> traversal = get_g_V_asXaX_repeatXbothX_timesX3X_emit_asXbX_group_byXselectXaXX_byXselectXbX_dedup_order_byXidX_foldX_selectXvaluesX_unfold_dedup();
         printTraversalForm(traversal);
@@ -299,6 +302,16 @@ public abstract class DedupTest extends AbstractGremlinProcessTest {
         assertTrue(vertices.contains(convertToVertex(graph, "ripple")));
     }
 
+    @Test
+    @LoadGraphWith(MODERN)
+    public void g_V_repeatXdedupX_timesX2X_count() {
+        final Traversal<Vertex, Long> traversal = get_g_V_repeatXdedupX_timesX2X_count();
+        printTraversalForm(traversal);
+        assertEquals(0L, traversal.next().longValue());
+        assertFalse(traversal.hasNext());
+    }
+
+
     public static class Traversals extends DedupTest {
         @Override
         public Traversal<Vertex, String> get_g_V_out_in_valuesXnameX_fold_dedupXlocalX_unfold() {
@@ -374,5 +387,10 @@ public abstract class DedupTest extends AbstractGremlinProcessTest {
         public Traversal<Vertex, Collection<Vertex>> get_g_V_asXaX_repeatXbothX_timesX3X_emit_asXbX_group_byXselectXaXX_byXselectXbX_dedup_order_byXidX_foldX_selectXvaluesX_unfold_dedup() {
             return g.V().as("a").repeat(both()).times(3).emit().as("b").group().by(select("a")).by(select("b").dedup().order().by(T.id).fold()).select(values).<Collection<Vertex>>unfold().dedup();
         }
+
+        @Override
+        public Traversal<Vertex, Long> get_g_V_repeatXdedupX_timesX2X_count() {
+            return g.V().repeat(dedup()).times(2).count();
+        }
     }
 }


[06/18] tinkerpop git commit: realized that DedupGlobalStep was doing a 'double dedup'. the barrier which is already dedup'd is added to the master traveral's dedup-step and then dedup'd again. I simple boolean flag added to determine if the master trave

Posted by ok...@apache.org.
realized that DedupGlobalStep was doing a 'double dedup'. the barrier which is already dedup'd is added to the master traveral's dedup-step and then dedup'd again. I simple boolean flag added to determine if the master traversal's dedup is getting data from workers or not and thus, if so, don't dedup again.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2ddc6324
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2ddc6324
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2ddc6324

Branch: refs/heads/master
Commit: 2ddc632494c96163f2a95e32ce73f03f38101262
Parents: 9c44f0d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 3 11:31:33 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../traversal/step/filter/DedupGlobalStep.java  |  4 +-
 .../groovy/TinkerGraphGroovyPlayTest.groovy     | 53 ++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2ddc6324/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index 220805f..b0afdc9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -58,6 +58,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     private final Set<String> dedupLabels;
     private Set<String> keepLabels;
     private boolean executingAtMaster = false;
+    private boolean barrierAdded = false;
 
     public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) {
         super(traversal);
@@ -66,7 +67,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     protected boolean filter(final Traverser.Admin<S> traverser) {
-        if (this.onGraphComputer && !this.executingAtMaster) return true;
+        if (this.onGraphComputer && (!this.executingAtMaster || this.barrierAdded)) return true;
         traverser.setBulk(1);
         if (null == this.dedupLabels) {
             return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal));
@@ -194,6 +195,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     public void addBarrier(final Map<Object, Traverser.Admin<S>> barrier) {
+        this.barrierAdded = true;
         IteratorUtils.removeOnNext(barrier.entrySet().iterator()).forEachRemaining(entry -> {
             final Traverser.Admin<S> traverser = entry.getValue();
             traverser.setSideEffects(this.getTraversal().getSideEffects());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2ddc6324/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
new file mode 100644
index 0000000..d277977
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/groovy/TinkerGraphGroovyPlayTest.groovy
@@ -0,0 +1,53 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.groovy
+
+import org.apache.tinkerpop.gremlin.process.computer.Computer
+import org.apache.tinkerpop.gremlin.structure.T
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
+import org.apache.tinkerpop.gremlin.util.TimeUtil
+import org.junit.Ignore
+import org.junit.Test
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+class TinkerGraphGroovyPlayTest {
+
+    @Test
+    @Ignore
+    public void testStuff() {
+
+        def graph = TinkerGraph.open()
+        def g = graph.traversal()
+        def a = graph.traversal().withComputer(Computer.compute())
+        def r = new Random(123)
+
+        (1..1725403).each {
+            def vid = ["a", "b", "c", "d"].collectEntries { [it, r.nextInt() % 400000] }
+            graph.addVertex(T.id, vid)
+        }; []
+
+        println TimeUtil.clockWithResult(1) { g.V().id().select("c").count().next() }
+        println TimeUtil.clockWithResult(1) { g.V().id().select("c").dedup().count().next() }
+        println TimeUtil.clockWithResult(1) { a.V().id().select("c").count().next() }
+        println TimeUtil.clockWithResult(1) { a.V().id().select("c").dedup().count().next() }
+    }
+}


[14/18] tinkerpop git commit: now that MemoryComputeKeys are cloneable, they are cloned accordignly in TraversalVertexProgram.clone().

Posted by ok...@apache.org.
now that MemoryComputeKeys are cloneable, they are cloned accordignly in TraversalVertexProgram.clone().


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/253248e5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/253248e5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/253248e5

Branch: refs/heads/master
Commit: 253248e52b334d3990f6364d45174abca00476cd
Parents: 3dd1f6e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 12:50:17 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../process/computer/traversal/TraversalVertexProgram.java       | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/253248e5/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index b82e265..30739b4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -369,6 +369,10 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             if (!clone.traversal.get().isLocked())
                 clone.traversal.get().applyStrategies();
             clone.traversalMatrix = new TraversalMatrix<>(clone.traversal.get());
+            clone.memoryComputeKeys = new HashSet<>();
+            for (final MemoryComputeKey memoryComputeKey : this.memoryComputeKeys) {
+                clone.memoryComputeKeys.add(memoryComputeKey.clone());
+            }
             return clone;
         } catch (final CloneNotSupportedException e) {
             throw new IllegalStateException(e.getMessage(), e);


[05/18] tinkerpop git commit: minor nothing.

Posted by ok...@apache.org.
minor nothing.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f352b018
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f352b018
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f352b018

Branch: refs/heads/master
Commit: f352b018f2c7d389abbf759e17fa1b094c14c63c
Parents: 2ddc632
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 3 11:51:38 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../gremlin/process/traversal/step/filter/DedupGlobalStep.java      | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f352b018/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index b0afdc9..dfe7958 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -132,6 +132,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     public void reset() {
         super.reset();
         this.duplicateSet.clear();
+        this.barrierAdded = false;
     }
 
     @Override


[04/18] tinkerpop git commit: Added TinkerWorkerMemory which will aggregate local to the current thread before propagated Memory to TinkerMemory. This reduces synchronization issues due all threads contending to mutate the master memory.

Posted by ok...@apache.org.
Added TinkerWorkerMemory which will aggregate local to the current thread before propagated Memory to TinkerMemory. This reduces synchronization issues due all threads contending to mutate the master memory.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0db0991c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0db0991c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0db0991c

Branch: refs/heads/master
Commit: 0db0991cc092e33091068f59f81c27feb9712379
Parents: f352b01
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 3 17:59:39 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../gremlin/process/traversal/step/map/GroupStep.java        | 3 ++-
 .../tinkergraph/process/computer/TinkerGraphComputer.java    | 8 +++++---
 .../gremlin/tinkergraph/process/computer/TinkerMemory.java   | 2 +-
 3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0db0991c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index 7d80d69..de4e223 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -131,6 +131,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
             clone.keyTraversal = this.keyTraversal.clone();
         clone.valueTraversal = this.valueTraversal.clone();
         clone.preTraversal = (Traversal.Admin<S, ?>) GroupStep.generatePreTraversal(clone.valueTraversal);
+        clone.setReducingBiOperator(new GroupBiOperator<>(clone.valueTraversal));
         return clone;
     }
 
@@ -171,7 +172,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
                 this.valueTraversal = null;
                 this.barrierStep = null;
             } else {
-                this.valueTraversal = valueTraversal;
+                this.valueTraversal = valueTraversal.clone();
                 this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
             }
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0db0991c/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index c333130..fef2e1a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -169,7 +169,8 @@ public final class TinkerGraphComputer implements GraphComputer {
                         workers.setVertexProgram(this.vertexProgram);
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
                         workers.executeVertexProgram(vertexProgram -> {
-                            vertexProgram.workerIterationStart(this.memory.asImmutable());
+                            final TinkerWorkerMemory workerMemory = new TinkerWorkerMemory(this.memory);
+                            vertexProgram.workerIterationStart(workerMemory.asImmutable());
                             while (true) {
                                 final Vertex vertex = vertices.next();
                                 if (Thread.interrupted()) throw new TraversalInterruptedException();
@@ -177,10 +178,11 @@ public final class TinkerGraphComputer implements GraphComputer {
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
-                                        this.memory
+                                        workerMemory
                                 );
                             }
-                            vertexProgram.workerIterationEnd(this.memory.asImmutable());
+                            vertexProgram.workerIterationEnd(workerMemory.asImmutable());
+                            workerMemory.complete();
                         });
                         this.messageBoard.completeIteration();
                         this.memory.completeSubRound();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0db0991c/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
index 34144e3..1502d84 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerMemory.java
@@ -138,7 +138,7 @@ public final class TinkerMemory implements Memory.Admin {
         return StringFactory.memoryString(this);
     }
 
-    private void checkKeyValue(final String key, final Object value) {
+    protected void checkKeyValue(final String key, final Object value) {
         if (!this.memoryKeys.containsKey(key))
             throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
         MemoryHelper.validateValue(value);


[09/18] tinkerpop git commit: no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).

Posted by ok...@apache.org.
no more SynchronizedIterator for TinkerGraphComputer. Each worker/thread has their own Iterator<Vertex> partition. In TinkerPop 3.3.0 (with Partitioner) this will be replaced (easily).


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3dd1f6e5
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3dd1f6e5
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3dd1f6e5

Branch: refs/heads/master
Commit: 3dd1f6e5e141d715e678c66acd0516806e625047
Parents: 27a4b27
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 12:39:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../process/computer/TinkerGraphComputer.java   | 14 +++-----
 .../process/computer/TinkerWorkerPool.java      | 36 +++++++++++++++++---
 2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 7523d63..2abce9a 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -156,24 +156,21 @@ public final class TinkerGraphComputer implements GraphComputer {
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
         return computerService.submit(() -> {
             final long time = System.currentTimeMillis();
-            final TinkerGraphComputerView view;
-            final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers);
+            final TinkerGraphComputerView view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, null != this.vertexProgram ? this.vertexProgram.getVertexComputeKeys() : Collections.emptySet());
+            final TinkerWorkerPool workers = new TinkerWorkerPool(this.graph, this.memory, this.workers);
             try {
                 if (null != this.vertexProgram) {
-                    view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys());
                     // execute the vertex program
                     this.vertexProgram.setup(this.memory);
                     while (true) {
                         if (Thread.interrupted()) throw new TraversalInterruptedException();
                         this.memory.completeSubRound();
                         workers.setVertexProgram(this.vertexProgram);
-                        final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
-                        workers.executeVertexProgram((vertexProgram, workerMemory) -> {
+                        workers.executeVertexProgram((vertices, vertexProgram, workerMemory) -> {
                             vertexProgram.workerIterationStart(workerMemory.asImmutable());
-                            while (true) {
+                            while (vertices.hasNext()) {
                                 final Vertex vertex = vertices.next();
                                 if (Thread.interrupted()) throw new TraversalInterruptedException();
-                                if (null == vertex) break;
                                 vertexProgram.execute(
                                         ComputerGraph.vertexProgram(vertex, vertexProgram),
                                         new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()),
@@ -192,9 +189,6 @@ public final class TinkerGraphComputer implements GraphComputer {
                         }
                     }
                     view.complete(); // drop all transient vertex compute keys
-                } else {
-                    // MapReduce only
-                    view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet());
                 }
 
                 // execute mapreduce jobs

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3dd1f6e5/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 140d347..637b416 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -23,14 +23,20 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool;
 import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
+import org.apache.tinkerpop.gremlin.util.function.TriConsumer;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 /**
@@ -48,13 +54,33 @@ public final class TinkerWorkerPool implements AutoCloseable {
     private VertexProgramPool vertexProgramPool;
     private MapReducePool mapReducePool;
     private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>();
+    private final List<List<Vertex>> workerVertices = new ArrayList<>();
 
-    public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) {
+    public TinkerWorkerPool(final TinkerGraph graph, final TinkerMemory memory, final int numberOfWorkers) {
         this.numberOfWorkers = numberOfWorkers;
         this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER);
         this.completionService = new ExecutorCompletionService<>(this.workerPool);
         for (int i = 0; i < this.numberOfWorkers; i++) {
             this.workerMemoryPool.add(new TinkerWorkerMemory(memory));
+            this.workerVertices.add(new ArrayList<>());
+        }
+        int batchSize = TinkerHelper.getVertices(graph).size() / this.numberOfWorkers;
+        if (0 == batchSize)
+            batchSize = 1;
+        int counter = 0;
+        int index = 0;
+
+        List<Vertex> currentWorkerVertices = this.workerVertices.get(index);
+        final Iterator<Vertex> iterator = graph.vertices();
+        while (iterator.hasNext()) {
+            final Vertex vertex = iterator.next();
+            if (counter++ < batchSize || index == this.workerVertices.size() - 1) {
+                currentWorkerVertices.add(vertex);
+            } else {
+                currentWorkerVertices = this.workerVertices.get(++index);
+                currentWorkerVertices.add(vertex);
+                counter = 1;
+            }
         }
     }
 
@@ -66,12 +92,14 @@ public final class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
-    public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
+    public void executeVertexProgram(final TriConsumer<Iterator<Vertex>, VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException {
         for (int i = 0; i < this.numberOfWorkers; i++) {
+            final int index = i;
             this.completionService.submit(() -> {
                 final VertexProgram vp = this.vertexProgramPool.take();
                 final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll();
-                worker.accept(vp, workerMemory);
+                final List<Vertex> vertices = this.workerVertices.get(index);
+                worker.accept(vertices.iterator(), vp, workerMemory);
                 this.vertexProgramPool.offer(vp);
                 this.workerMemoryPool.offer(workerMemory);
                 return null;


[13/18] tinkerpop git commit: CHAGELOG updated. Lunch time.

Posted by ok...@apache.org.
CHAGELOG updated. Lunch time.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/27a4b271
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/27a4b271
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/27a4b271

Branch: refs/heads/master
Commit: 27a4b271c54f0ab06fc8c0ad0578e4433d6db536
Parents: 8d96128
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 10:49:02 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 CHANGELOG.asciidoc | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/27a4b271/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 81dd94e..1fc5355 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -28,6 +28,7 @@ TinkerPop 3.2.4 (Release Date: NOT OFFICIALLY RELEASED YET)
 
 * Added documentation around "terminal steps" in Gremlin: `hasNext()`, `next()`, `toList()`, etc.
 * Fixed minor bug in `gremlin-driver` where closing a session-based `Client` without initializing it could generate an error.
+* Relieved synchronization pressure in various areas of `TinkerGraphComputer`.
 * Fixed an optimization bug in `DedupGlobalStep` in OLAP where deduping occurred twice.
 * `MemoryComputeKey` now implements `Cloneable` which is useful for `BiOperator` reducers that maintain thread-unsafe state.
 * `TinkerGraphComputer` now supports distributed `Memory` with lock-free partition aggregation.


[03/18] tinkerpop git commit: forgot to add TinkerWorkerMemory file.

Posted by ok...@apache.org.
forgot to add TinkerWorkerMemory file.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/056d7aed
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/056d7aed
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/056d7aed

Branch: refs/heads/master
Commit: 056d7aedffa83f8d06462617670273857e2bea19
Parents: 0db0991
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 3 18:00:18 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../process/computer/TinkerWorkerMemory.java    | 116 +++++++++++++++++++
 1 file changed, 116 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/056d7aed/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
new file mode 100644
index 0000000..28b99e3
--- /dev/null
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -0,0 +1,116 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.util.Serializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TinkerWorkerMemory implements Memory.Admin {
+
+    private final TinkerMemory mainMemory;
+    private final Map<String, Object> workerMemory = new HashMap<>();
+    private final Map<String, BinaryOperator<Object>> reducers = new HashMap<>();
+
+    public TinkerWorkerMemory(final TinkerMemory mainMemory) {
+        this.mainMemory = mainMemory;
+        for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
+            try {
+                final MemoryComputeKey clone = (MemoryComputeKey) Serializer.deserializeObject(Serializer.serializeObject(key));
+                this.reducers.put(clone.getKey(), clone.getReducer());
+            } catch (final IOException | ClassNotFoundException e) {
+                this.reducers.put(key.getKey(), key.getReducer()); // super ghetto
+            }
+        }
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.mainMemory.keys();
+    }
+
+    @Override
+    public void incrIteration() {
+        this.mainMemory.incrIteration();
+    }
+
+    @Override
+    public void setIteration(final int iteration) {
+        this.mainMemory.setIteration(iteration);
+    }
+
+    @Override
+    public int getIteration() {
+        return this.mainMemory.getIteration();
+    }
+
+    @Override
+    public void setRuntime(final long runTime) {
+        this.mainMemory.setRuntime(runTime);
+    }
+
+    @Override
+    public long getRuntime() {
+        return this.mainMemory.getRuntime();
+    }
+
+    @Override
+    public boolean isInitialIteration() {
+        return this.mainMemory.isInitialIteration();
+    }
+
+    @Override
+    public <R> R get(final String key) throws IllegalArgumentException {
+        return this.mainMemory.get(key);
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        this.mainMemory.set(key, value);
+    }
+
+    @Override
+    public void add(final String key, final Object value) {
+        this.mainMemory.checkKeyValue(key, value);
+        final Object v = this.workerMemory.get(key);
+        this.workerMemory.put(key, null == v ? value : this.reducers.get(key).apply(v, value));
+    }
+
+    @Override
+    public String toString() {
+        return this.mainMemory.toString();
+    }
+
+    protected void complete() {
+        for (final Map.Entry<String, Object> entry : this.workerMemory.entrySet()) {
+            this.mainMemory.add(entry.getKey(), entry.getValue());
+        }
+    }
+}


[12/18] tinkerpop git commit: have DedupGlobalStep as optimized as I can get it. I think BloomFilters are the next thing. Also, detachment factory stuff will help reduce barrier sizes. Found a bug in SparkInterceptorStrategyTest. Fixed.

Posted by ok...@apache.org.
have DedupGlobalStep as optimized as I can get it. I think BloomFilters are the next thing. Also, detachment factory stuff will help reduce barrier sizes. Found a bug in SparkInterceptorStrategyTest. Fixed.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/e9258086
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/e9258086
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/e9258086

Branch: refs/heads/master
Commit: e9258086441d013e9961d81473b275054d41d8cc
Parents: ba39074
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 5 13:29:32 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../traversal/step/filter/DedupGlobalStep.java  | 70 ++++++++++----------
 .../SparkInterceptorStrategyTest.java           |  2 +-
 2 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e9258086/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index a4c1b6a..d024456 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -40,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -57,8 +58,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     private final Set<String> dedupLabels;
     private Set<String> keepLabels;
     private boolean executingAtMaster = false;
-    private boolean barrierAdded = false;
     private Map<Object, Traverser.Admin<S>> barrier;
+    private Iterator<Map.Entry<Object, Traverser.Admin<S>>> barrierIterator;
 
     public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) {
         super(traversal);
@@ -67,8 +68,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     protected boolean filter(final Traverser.Admin<S> traverser) {
-        if (this.onGraphComputer && (!this.executingAtMaster || this.barrierAdded)) return true;
-        traverser.setBulk(1);
+        if (this.onGraphComputer && !this.executingAtMaster) return true;
+        traverser.setBulk(1L);
         if (null == this.dedupLabels) {
             return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal));
         } else {
@@ -92,13 +93,16 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     @Override
     protected Traverser.Admin<S> processNextStart() {
         if (null != this.barrier) {
-            for (final Map.Entry<Object, Traverser.Admin<S>> entry : this.barrier.entrySet()) {
-                if (this.duplicateSet.add(entry.getKey()))
-                    this.starts.add(entry.getValue());
-            }
-            this.barrierAdded = true;
+            this.barrierIterator = this.barrier.entrySet().iterator();
             this.barrier = null;
         }
+        while (this.barrierIterator != null && this.barrierIterator.hasNext()) {
+            if (null == this.barrierIterator)
+                this.barrierIterator = this.barrier.entrySet().iterator();
+            final Map.Entry<Object, Traverser.Admin<S>> entry = this.barrierIterator.next();
+            if (this.duplicateSet.add(entry.getKey()))
+                return PathProcessor.processTraverserPathLabels(entry.getValue(), this.keepLabels);
+        }
         return PathProcessor.processTraverserPathLabels(super.processNextStart(), this.keepLabels);
     }
 
@@ -141,8 +145,8 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     public void reset() {
         super.reset();
         this.duplicateSet.clear();
-        this.barrierAdded = false;
         this.barrier = null;
+        this.barrierIterator = null;
     }
 
     @Override
@@ -179,35 +183,31 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     public Map<Object, Traverser.Admin<S>> nextBarrier() throws NoSuchElementException {
-        if (null != this.barrier) {
-            final Map<Object, Traverser.Admin<S>> tempBarrier = this.barrier;
-            this.barrier = null;
-            this.barrierAdded = false;
-            return tempBarrier;
-        } else {
-            final Map<Object, Traverser.Admin<S>> map = new HashMap<>();
-            while (this.starts.hasNext()) {
-                final Traverser.Admin<S> traverser = this.starts.next();
-                final Object object;
-                if (null != this.dedupLabels) {
-                    object = new ArrayList<>(this.dedupLabels.size());
-                    for (final String label : this.dedupLabels) {
-                        ((List) object).add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal));
-                    }
-                } else {
-                    object = TraversalUtil.applyNullable(traverser, this.dedupTraversal);
-                }
-                if (!map.containsKey(object)) {
-                    traverser.setBulk(1L);
-                    traverser.set(DetachedFactory.detach(traverser.get(), true));
-                    map.put(object, traverser);
+        final Map<Object, Traverser.Admin<S>> map = null != this.barrier ? this.barrier : new HashMap<>();
+        while (this.starts.hasNext()) {
+            final Traverser.Admin<S> traverser = this.starts.next();
+            final Object object;
+            if (null != this.dedupLabels) {
+                object = new ArrayList<>(this.dedupLabels.size());
+                for (final String label : this.dedupLabels) {
+                    ((List) object).add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal));
                 }
+            } else {
+                object = TraversalUtil.applyNullable(traverser, this.dedupTraversal);
+            }
+            if (!map.containsKey(object)) {
+                traverser.setBulk(1L);
+                // traverser.detach();
+                traverser.set(DetachedFactory.detach(traverser.get(), true)); // TODO: detect required detachment accordingly
+                map.put(object, traverser);
             }
-            if (map.isEmpty())
-                throw FastNoSuchElementException.instance();
-            else
-                return map;
         }
+        this.barrier = null;
+        this.barrierIterator = null;
+        if (map.isEmpty())
+            throw FastNoSuchElementException.instance();
+        else
+            return map;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/e9258086/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
index 24e3663..a53b3bd 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/SparkInterceptorStrategyTest.java
@@ -142,7 +142,7 @@ public class SparkInterceptorStrategyTest extends AbstractSparkTest {
         test(6l, g.V().out().values("name").count());
         test(2l, g.V().out("knows").values("name").count());
         test(3l, g.V().in().has("name", "marko").count());
-        test(6l, g.V().repeat(__.dedup()).times(2).count());
+        test(0l, g.V().repeat(__.dedup()).times(2).count());
         test(6l, g.V().dedup().count());
         test(4l, g.V().hasLabel("person").order().by("age").count());
         test(1l, g.V().count().count());


[10/18] tinkerpop git commit: more minor optimizations to DedupGlobalStep.

Posted by ok...@apache.org.
more minor optimizations to DedupGlobalStep.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ba390748
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ba390748
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ba390748

Branch: refs/heads/master
Commit: ba390748443c603009b5a59b6dc64882a4819770
Parents: 64c8065
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 5 10:34:23 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../traversal/step/filter/DedupGlobalStep.java  | 71 +++++++++++---------
 1 file changed, 39 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ba390748/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index e3d36b1..a4c1b6a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -58,7 +58,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     private Set<String> keepLabels;
     private boolean executingAtMaster = false;
     private boolean barrierAdded = false;
-    private Map<Object, Traverser.Admin<S>> masterBarrier;
+    private Map<Object, Traverser.Admin<S>> barrier;
 
     public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) {
         super(traversal);
@@ -91,11 +91,14 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     protected Traverser.Admin<S> processNextStart() {
-        if (null != this.masterBarrier) {
-            this.starts.add(this.masterBarrier.values().iterator());
+        if (null != this.barrier) {
+            for (final Map.Entry<Object, Traverser.Admin<S>> entry : this.barrier.entrySet()) {
+                if (this.duplicateSet.add(entry.getKey()))
+                    this.starts.add(entry.getValue());
+            }
             this.barrierAdded = true;
+            this.barrier = null;
         }
-        this.masterBarrier = null;
         return PathProcessor.processTraverserPathLabels(super.processNextStart(), this.keepLabels);
     }
 
@@ -139,6 +142,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
         super.reset();
         this.duplicateSet.clear();
         this.barrierAdded = false;
+        this.barrier = null;
     }
 
     @Override
@@ -170,45 +174,48 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     public boolean hasNextBarrier() {
-        return this.starts.hasNext();
+        return null != this.barrier || this.starts.hasNext();
     }
 
     @Override
     public Map<Object, Traverser.Admin<S>> nextBarrier() throws NoSuchElementException {
-        final Map<Object, Traverser.Admin<S>> map = new HashMap<>();
-        while (this.starts.hasNext()) {
-            final Traverser.Admin<S> traverser = this.starts.next();
-            final Object object;
-            if (null != this.dedupLabels) {
-                object = new ArrayList<>(this.dedupLabels.size());
-                for (final String label : this.dedupLabels) {
-                    ((List) object).add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal));
+        if (null != this.barrier) {
+            final Map<Object, Traverser.Admin<S>> tempBarrier = this.barrier;
+            this.barrier = null;
+            this.barrierAdded = false;
+            return tempBarrier;
+        } else {
+            final Map<Object, Traverser.Admin<S>> map = new HashMap<>();
+            while (this.starts.hasNext()) {
+                final Traverser.Admin<S> traverser = this.starts.next();
+                final Object object;
+                if (null != this.dedupLabels) {
+                    object = new ArrayList<>(this.dedupLabels.size());
+                    for (final String label : this.dedupLabels) {
+                        ((List) object).add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal));
+                    }
+                } else {
+                    object = TraversalUtil.applyNullable(traverser, this.dedupTraversal);
+                }
+                if (!map.containsKey(object)) {
+                    traverser.setBulk(1L);
+                    traverser.set(DetachedFactory.detach(traverser.get(), true));
+                    map.put(object, traverser);
                 }
-            } else {
-                object = TraversalUtil.applyNullable(traverser, this.dedupTraversal);
-            }
-            if (!map.containsKey(object)) {
-                traverser.setBulk(1l);
-                traverser.set(DetachedFactory.detach(traverser.get(), true));
-                map.put(object, traverser);
             }
+            if (map.isEmpty())
+                throw FastNoSuchElementException.instance();
+            else
+                return map;
         }
-        if (map.isEmpty())
-            throw FastNoSuchElementException.instance();
-        else
-            return map;
-
     }
 
     @Override
     public void addBarrier(final Map<Object, Traverser.Admin<S>> barrier) {
-        if (null == this.masterBarrier)
-            this.masterBarrier = new HashMap<>(barrier);
-        else {
-            for (Map.Entry<Object, Traverser.Admin<S>> entry : barrier.entrySet()) {
-                this.masterBarrier.putIfAbsent(entry.getKey(), entry.getValue());
-            }
-        }
+        if (null == this.barrier)
+            this.barrier = new HashMap<>(barrier);
+        else
+            this.barrier.putAll(barrier);
     }
 
     @Override


[11/18] tinkerpop git commit: added the concept of a master barrier to DedupGlobalStep to speed up the addition of worker barriers into the master traversal. A slight performance increase.

Posted by ok...@apache.org.
added the concept of a master barrier to DedupGlobalStep to speed up the addition of worker barriers into the master traversal. A slight performance increase.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/64c80657
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/64c80657
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/64c80657

Branch: refs/heads/master
Commit: 64c80657bf9199d5b9f28f56b65f1ce327192bd1
Parents: 253248e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 13:13:58 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 5 17:00:17 2017 -0700

----------------------------------------------------------------------
 .../traversal/step/filter/DedupGlobalStep.java  | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/64c80657/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index dfe7958..e3d36b1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -34,7 +34,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementExce
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,6 +58,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
     private Set<String> keepLabels;
     private boolean executingAtMaster = false;
     private boolean barrierAdded = false;
+    private Map<Object, Traverser.Admin<S>> masterBarrier;
 
     public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) {
         super(traversal);
@@ -76,6 +76,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
             this.dedupLabels.forEach(label -> objects.add(TraversalUtil.applyNullable((S) this.getScopeValue(Pop.last, label, traverser), this.dedupTraversal)));
             return this.duplicateSet.add(objects);
         }
+
     }
 
     @Override
@@ -90,6 +91,11 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     protected Traverser.Admin<S> processNextStart() {
+        if (null != this.masterBarrier) {
+            this.starts.add(this.masterBarrier.values().iterator());
+            this.barrierAdded = true;
+        }
+        this.masterBarrier = null;
         return PathProcessor.processTraverserPathLabels(super.processNextStart(), this.keepLabels);
     }
 
@@ -196,12 +202,13 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
 
     @Override
     public void addBarrier(final Map<Object, Traverser.Admin<S>> barrier) {
-        this.barrierAdded = true;
-        IteratorUtils.removeOnNext(barrier.entrySet().iterator()).forEachRemaining(entry -> {
-            final Traverser.Admin<S> traverser = entry.getValue();
-            traverser.setSideEffects(this.getTraversal().getSideEffects());
-            this.addStart(traverser);
-        });
+        if (null == this.masterBarrier)
+            this.masterBarrier = new HashMap<>(barrier);
+        else {
+            for (Map.Entry<Object, Traverser.Admin<S>> entry : barrier.entrySet()) {
+                this.masterBarrier.putIfAbsent(entry.getKey(), entry.getValue());
+            }
+        }
     }
 
     @Override