You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/04 11:55:09 UTC

tinkerpop git commit: Made is so MemoryComputeKey implements Cloneable. This is actually really important we have NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting lucky' in that Spark and Giraph use Serializat

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1585 0d87d93ba -> 8721036b4


Made is so MemoryComputeKey implements Cloneable. This is actually really important we have NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting lucky' in that Spark and Giraph use Serialization and thus we get a clone for free. However, for parallelization within a JVM, we woulld have issues except we never realized because we had a single global Memory for TinkerGraph. Now we don't and clone()ing bi operators works.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8721036b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8721036b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8721036b

Branch: refs/heads/TINKERPOP-1585
Commit: 8721036b45b23ab815c59ff22356224aa6b0b98c
Parents: 0d87d93
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 04:55:12 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 4 04:55:12 2017 -0700

----------------------------------------------------------------------
 .../process/computer/MemoryComputeKey.java      | 24 ++++++++++++++++++--
 .../process/traversal/step/map/GroupStep.java   | 18 +++++++++++++--
 .../traversal/step/map/OrderGlobalStep.java     | 13 ++++++++++-
 .../util/function/ChainedComparator.java        | 18 +++++++++++++--
 .../process/computer/TinkerWorkerMemory.java    |  9 +-------
 5 files changed, 67 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8721036b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
index 94ca675..70adf3d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
@@ -22,6 +22,8 @@ package org.apache.tinkerpop.gremlin.process.computer;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.function.BinaryOperator;
 
 /**
@@ -32,10 +34,10 @@ import java.util.function.BinaryOperator;
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryComputeKey<A> implements Serializable {
+public final class MemoryComputeKey<A> implements Serializable, Cloneable {
 
     private final String key;
-    private final BinaryOperator<A> reducer;
+    private BinaryOperator<A> reducer;
     private final boolean isTransient;
     private final boolean isBroadcast;
 
@@ -73,7 +75,25 @@ public final class MemoryComputeKey<A> implements Serializable {
         return object instanceof MemoryComputeKey && ((MemoryComputeKey) object).key.equals(this.key);
     }
 
+    @Override
+    public MemoryComputeKey<A> clone() {
+        try {
+            final MemoryComputeKey<A> clone = (MemoryComputeKey<A>) super.clone();
+            for (final Method method : this.reducer.getClass().getMethods()) {
+                if (method.getName().equals("clone") && 0 == method.getParameterCount()) {
+                    clone.reducer = (BinaryOperator<A>) method.invoke(this.reducer);
+                    break;
+                }
+            }
+            return clone;
+        } catch (final IllegalAccessException | InvocationTargetException | CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     public static <A> MemoryComputeKey<A> of(final String key, final BinaryOperator<A> reducer, final boolean isBroadcast, final boolean isTransient) {
         return new MemoryComputeKey<>(key, reducer, isBroadcast, isTransient);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8721036b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de4e223..d6ce421 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -95,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
             final TraverserSet traverserSet = new TraverserSet<>();
             this.preTraversal.reset();
             this.preTraversal.addStart(traverser);
-            while(this.preTraversal.hasNext()) {
+            while (this.preTraversal.hasNext()) {
                 traverserSet.add(this.preTraversal.nextTraverser());
             }
             map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
@@ -158,7 +158,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
 
     ///////////////////////
 
-    public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
+    public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable, Cloneable {
 
         // size limit before Barrier.processAllStarts() to lazy reduce
         private static final int SIZE_LIMIT = 1000;
@@ -182,6 +182,20 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         }
 
         @Override
+        public GroupBiOperator<K, V> clone() {
+            try {
+                final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone();
+                if (null != this.valueTraversal) {
+                    clone.valueTraversal = this.valueTraversal.clone();
+                    clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal).orElse(null);
+                }
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) {
             for (final K key : mapB.keySet()) {
                 Object objectA = mapA.get(key);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8721036b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index 60be2d6..a7d21b2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -144,7 +144,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
 
     ////////////////
 
-    public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable {
+    public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable, Cloneable {
 
         private ChainedComparator chainedComparator;
         private long limit;
@@ -159,6 +159,17 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
         }
 
         @Override
+        public OrderBiOperator<S> clone() {
+            try {
+                final OrderBiOperator<S> clone = (OrderBiOperator<S>) super.clone();
+                clone.chainedComparator = this.chainedComparator.clone();
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) {
             setA.addAll(setB);
             if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8721036b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
index 44a994b..bdb2e6d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
@@ -34,9 +34,9 @@ import java.util.stream.Collectors;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>, Serializable {
+public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>, Serializable, Cloneable {
 
-    private final List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>();
+    private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators = new ArrayList<>();
     private final boolean isShuffle;
     private final boolean traversers;
 
@@ -66,4 +66,18 @@ public final class ChainedComparator<S, C extends Comparable> implements Compara
         }
         return 0;
     }
+
+    @Override
+    public ChainedComparator<S, C> clone() {
+        try {
+            final ChainedComparator<S, C> clone = (ChainedComparator<S, C>) super.clone();
+            clone.comparators = new ArrayList<>();
+            for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator : this.comparators) {
+                clone.comparators.add(new Pair<>(comparator.getValue0().clone(), comparator.getValue1()));
+            }
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8721036b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 081e4fa..1afa27e 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,9 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.util.Serializer;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -41,12 +39,7 @@ public final class TinkerWorkerMemory implements Memory.Admin {
     public TinkerWorkerMemory(final TinkerMemory mainMemory) {
         this.mainMemory = mainMemory;
         for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
-            try {
-                final MemoryComputeKey clone = Serializer.cloneObject(key);
-                this.reducers.put(clone.getKey(), clone.getReducer());
-            } catch (final IOException | ClassNotFoundException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+            this.reducers.put(key.getKey(), key.clone().getReducer());
         }
     }