You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/24 23:13:16 UTC

incubator-tinkerpop git commit: StoreStep no longer uses MapReduce. All that is left is AggregateStep and ProfileStep.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1166 aa79390f7 -> 980525cd2


StoreStep no longer uses MapReduce. All that is left is AggregateStep and ProfileStep.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/980525cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/980525cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/980525cd

Branch: refs/heads/TINKERPOP-1166
Commit: 980525cd2d7ee656858f68703635972c740486fa
Parents: aa79390
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Feb 24 15:13:09 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Feb 24 15:13:09 2016 -0700

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       |  5 +-
 .../traversal/step/sideEffect/StoreStep.java    | 79 +++++---------------
 .../gremlin/structure/io/gryo/GryoMapper.java   |  4 +-
 3 files changed, 23 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/980525cd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index a6fa0fd..9d7be24 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -125,7 +125,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         }
         for (final GraphComputing graphComputing : TraversalHelper.getStepsOfAssignableClassRecursively(GraphComputing.class, this.traversal.get())) {
             graphComputing.getMemoryComputeKey().ifPresent(this.memoryComputeKeys::add);
-            graphComputing.getMemoryComputeKey().ifPresent(x -> this.sideEffectKeys.add(x.getKey()));
+            graphComputing.getMemoryComputeKey().ifPresent(x -> this.sideEffectKeys.add(x.getKey())); // TODO: when no more MapReducers, you can remove this
         }
     }
 
@@ -182,8 +182,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory));
         }
         this.traversal.get().getSideEffects().forEach((key, value) -> {
-            if (this.sideEffectKeys.contains(key) &&
-                    vertex.<Map<String, Object>>property(VertexTraversalSideEffects.SIDE_EFFECTS).value().containsKey(key)) {
+            if (this.sideEffectKeys.contains(key) && vertex.<Map<String, Object>>property(VertexTraversalSideEffects.SIDE_EFFECTS).value().containsKey(key)) {
                 memory.add(key, value);
                 vertex.<Map<String, Object>>property(VertexTraversalSideEffects.SIDE_EFFECTS).value().remove(key);
             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/980525cd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
index 9b61097..ebaecab 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
@@ -18,38 +18,31 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
-import java.util.function.Supplier;
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, ByModulating, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, ByModulating, GraphComputing {
 
     private Traversal.Admin<S, Object> storeTraversal = null;
     private String sideEffectKey;
@@ -78,10 +71,6 @@ public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectC
         return StringFactory.stepString(this, this.sideEffectKey, this.storeTraversal);
     }
 
-    @Override
-    public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
-        return new StoreMapReduce(this);
-    }
 
     @Override
     public List<Traversal.Admin<S, Object>> getLocalChildren() {
@@ -114,56 +103,24 @@ public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectC
         return result;
     }
 
-    ///////////////
-
-    public static final class StoreMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
-
-        public static final String STORE_STEP_SIDE_EFFECT_KEY = "gremlin.storeStep.sideEffectKey";
-
-        private String sideEffectKey;
-        private Supplier<Collection> collectionSupplier;
-
-        private StoreMapReduce() {
-
-        }
-
-        public StoreMapReduce(final StoreStep step) {
-            this.sideEffectKey = step.getSideEffectKey();
-            this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
-        }
-
-        @Override
-        public void storeState(final Configuration configuration) {
-            super.storeState(configuration);
-            configuration.setProperty(STORE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
-        }
+    @Override
+    public void onGraphComputer() {
 
-        @Override
-        public void loadState(final Graph graph, final Configuration configuration) {
-            this.sideEffectKey = configuration.getString(STORE_STEP_SIDE_EFFECT_KEY);
-            this.collectionSupplier = TraversalVertexProgram.getTraversal(graph, configuration).getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
-        }
+    }
 
-        @Override
-        public boolean doStage(final Stage stage) {
-            return stage.equals(Stage.MAP);
-        }
+    @Override
+    public Optional<MemoryComputeKey> getMemoryComputeKey() {
+        return Optional.of(MemoryComputeKey.of(this.sideEffectKey, StoreBiOperator.INSTANCE, false, false));
+    }
 
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
-            VertexTraversalSideEffects.of(vertex).<Collection<?>>get(this.sideEffectKey).ifPresent(list -> list.forEach(emitter::emit));
-        }
+    public static class StoreBiOperator<A> implements BinaryOperator<Collection<A>>, Serializable {
 
-        @Override
-        public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
-            final Collection collection = this.collectionSupplier.get();
-            keyValues.forEachRemaining(pair -> collection.add(pair.getValue()));
-            return collection;
-        }
+        private final static StoreBiOperator INSTANCE = new StoreBiOperator();
 
         @Override
-        public String getMemoryKey() {
-            return this.sideEffectKey;
+        public Collection<A> apply(final Collection<A> mutatingSeed, final Collection<A> collection) {
+            mutatingSeed.addAll(collection);
+            return mutatingSeed;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/980525cd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index 640898e..2bf387a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.TreeStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupCountStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupStepV3d0;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StoreStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser;
@@ -342,7 +343,8 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(SumGlobalStep.SumGlobalBiOperator.class, null, 121));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(TreeStep.TreeBiOperator.class, null, 122));
             add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(GroupStepV3d0.GroupBiOperatorV3d0.class, null, 123));
-            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Operator.class, null, 124)); // ***LAST ID**
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(StoreStep.StoreBiOperator.class, null, 124));
+            add(Triplet.<Class, Function<Kryo, Serializer>, Integer>with(Operator.class, null, 125)); // ***LAST ID**
         }};
 
         private final List<IoRegistry> registries = new ArrayList<>();