You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/27 15:55:22 UTC

[07/10] incubator-tinkerpop git commit: Moved MasterExecutor.detach() to HaltedTraverserStrategy.halt() as this is now generally useful for OLTP and not just OLAP. Lots more nick nack cleanups and comments.

Moved MasterExecutor.detach() to HaltedTraverserStrategy.halt() as this is now generally useful for OLTP and not just OLAP. Lots more nick nack cleanups and comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/56c43ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/56c43ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/56c43ea7

Branch: refs/heads/master
Commit: 56c43ea7bd89e0ef72b958400d7bd83a10ad6a15
Parents: f78d1cb
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 26 07:38:12 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 26 07:38:12 2016 -0600

----------------------------------------------------------------------
 .../computer/traversal/MasterExecutor.java      | 21 ++-------
 .../traversal/TraversalVertexProgram.java       | 21 +++++----
 .../computer/traversal/WorkerExecutor.java      | 49 ++++++++++++--------
 .../decoration/HaltedTraverserStrategy.java     | 17 ++++++-
 4 files changed, 61 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
index 1c1e9d2..0462950 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.HaltedTraverserStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -57,18 +58,6 @@ final class MasterExecutor {
 
     }
 
-    // the halted traversers can either be reference or detached elements -- this is good for determining how much data around the element the user wants to get
-    // see HaltedTraverserFactoryStrategy for how this is all connected
-    protected static <R> Traverser.Admin<R> detach(final Traverser.Admin<R> traverser, final Class haltedTraverserFactory) {
-        if (haltedTraverserFactory.equals(DetachedFactory.class))
-            traverser.set(DetachedFactory.detach(traverser.get(), true));
-        else if (haltedTraverserFactory.equals(ReferenceFactory.class))
-            traverser.set(ReferenceFactory.detach(traverser.get()));
-        else
-            throw new IllegalArgumentException("The following detaching factory is unknown: " + haltedTraverserFactory);
-        return traverser;
-    }
-
     protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> toProcessTraversers, final Set<String> completedBarriers) {
         // handle traversers and data that were sent from the workers to the master traversal via memory
         if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
@@ -94,7 +83,7 @@ final class MasterExecutor {
                                             TraverserSet<Object> toProcessTraversers,
                                             final TraverserSet<Object> remoteActiveTraversers,
                                             final TraverserSet<Object> haltedTraversers,
-                                            final Class haltedTraverserFactory) {
+                                            final HaltedTraverserStrategy haltedTraverserStrategy) {
 
         while (!toProcessTraversers.isEmpty()) {
             final TraverserSet<Object> localActiveTraversers = new TraverserSet<>();
@@ -109,7 +98,7 @@ final class MasterExecutor {
                 traverser.set(DetachedFactory.detach(traverser.get(), true)); // why?
                 traverser.setSideEffects(traversal.get().getSideEffects());
                 if (traverser.isHalted())
-                    haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
+                    haltedTraversers.add(haltedTraverserStrategy.halt(traverser));
                 else if (isRemoteTraverser(traverser, traversalMatrix))  // this is so that patterns like order().name work as expected. try and stay local as long as possible
                     remoteActiveTraversers.add(traverser.detach());
                 else {
@@ -118,7 +107,7 @@ final class MasterExecutor {
                         while (previousStep.hasNext()) {
                             final Traverser.Admin<Object> result = previousStep.next();
                             if (result.isHalted())
-                                haltedTraversers.add(MasterExecutor.detach(result, haltedTraverserFactory));
+                                haltedTraversers.add(haltedTraverserStrategy.halt(traverser));
                             else if (isRemoteTraverser(result, traversalMatrix))
                                 remoteActiveTraversers.add(result.detach());
                             else
@@ -133,7 +122,7 @@ final class MasterExecutor {
                 while (currentStep.hasNext()) {
                     final Traverser.Admin<Object> traverser = currentStep.next();
                     if (traverser.isHalted())
-                        haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
+                        haltedTraversers.add(haltedTraverserStrategy.halt(traverser));
                     else if (isRemoteTraverser(traverser, traversalMatrix))
                         remoteActiveTraversers.add(traverser.detach());
                     else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 07ab98a..c9dc053 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -109,7 +109,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     private final Set<MapReduce> mapReducers = new HashSet<>();
     private TraverserSet<Object> haltedTraversers;
     private boolean returnHaltedTraversers = false;
-    private Class haltedTraverserDetachFactory;
+    private HaltedTraverserStrategy haltedTraverserStrategy;
 
     private TraversalVertexProgram() {
     }
@@ -167,11 +167,11 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         (this.traversal.get().getParent().asStep().getNextStep() instanceof ProfileStep && // same as above, but needed for profiling
                                 this.traversal.get().getParent().asStep().getNextStep().getNextStep() instanceof ComputerResultStep));
         // determine how to store halted traversers
-        this.haltedTraverserDetachFactory = ((HaltedTraverserStrategy) this.traversal.get().getStrategies().toList()
+        this.haltedTraverserStrategy = ((HaltedTraverserStrategy) this.traversal.get().getStrategies().toList()
                 .stream()
                 .filter(strategy -> strategy instanceof HaltedTraverserStrategy)
                 .findAny()
-                .orElse(HaltedTraverserStrategy.reference())).getHaltedTraverserFactory();
+                .orElse(HaltedTraverserStrategy.reference()));
         // register traversal side-effects in memory
         this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
         // register MapReducer memory compute keys
@@ -217,9 +217,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
                 toProcessTraversers.add(traverser);
             });
-            assert haltedTraversers.isEmpty();
+            assert this.haltedTraversers.isEmpty();
             final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
-            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, this.haltedTraversers, this.haltedTraverserDetachFactory);
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, this.haltedTraversers, this.haltedTraverserStrategy);
             memory.set(HALTED_TRAVERSERS, this.haltedTraversers);
             memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers);
         } else {
@@ -238,6 +238,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final Memory memory) {
         // if any global halted traversers, simply don't use them as they were handled by master setup()
+        // these halted traversers are typically from a previous OLAP job that yielded traversers at the master traversal
         if (null != this.haltedTraversers)
             this.haltedTraversers = null;
         // memory is distributed
@@ -274,16 +275,16 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 graphStep.forEachRemaining(traverser -> {
                     if (traverser.isHalted()) {
                         if (this.returnHaltedTraversers)
-                            memory.add(HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, this.haltedTraverserDetachFactory)));
+                            memory.add(HALTED_TRAVERSERS, new TraverserSet<>(this.haltedTraverserStrategy.halt(traverser)));
                         else
                             haltedTraversers.add((Traverser.Admin) traverser.detach());
                     } else
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserDetachFactory));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserStrategy));
         } else   // ITERATION 1+
-            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserDetachFactory));
+            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserStrategy));
         // save space by not having an empty halted traversers property
         if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
             vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
@@ -307,7 +308,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             final Set<String> completedBarriers = new HashSet<>();
             MasterExecutor.processMemory(this.traversalMatrix, memory, toProcessTraversers, completedBarriers);
             // process all results from barriers locally and when elements are touched, put them in remoteActiveTraversers
-            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers, this.haltedTraverserDetachFactory);
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers, this.haltedTraverserStrategy);
             // tell parallel barriers that might not have been active in the last round that they are no longer active
             memory.set(COMPLETED_BARRIERS, completedBarriers);
             if (!remoteActiveTraversers.isEmpty() ||
@@ -319,7 +320,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 // finalize locally with any last traversers dangling in the local traversal
                 final Step<?, Object> endStep = (Step<?, Object>) this.traversal.get().getEndStep();
                 while (endStep.hasNext()) {
-                    haltedTraversers.add(MasterExecutor.detach(endStep.next(), this.haltedTraverserDetachFactory));
+                    haltedTraversers.add(this.haltedTraverserStrategy.halt(endStep.next()));
                 }
                 // the result of a TraversalVertexProgram are the halted traversers
                 memory.set(HALTED_TRAVERSERS, haltedTraversers);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index 5798af0..7181992 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.HaltedTraverserStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -57,7 +58,7 @@ final class WorkerExecutor {
                                      final TraversalMatrix<?, ?> traversalMatrix,
                                      final Memory memory,
                                      final boolean returnHaltedTraversers,
-                                     final Class haltedTraverserFactory) {
+                                     final HaltedTraverserStrategy haltedTraverserStrategy) {
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
@@ -68,6 +69,7 @@ final class WorkerExecutor {
         // GENERATE LOCAL TRAVERSERS //
         ///////////////////////////////
 
+        // MASTER ACTIVE
         // these are traversers that are going from OLTP (master) to OLAP (workers)
         // these traversers were broadcasted from the master traversal to the workers for attachment
         final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
@@ -75,17 +77,21 @@ final class WorkerExecutor {
         // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
         // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing.
         synchronized (maybeActiveTraversers) {
-            final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
-            while (iterator.hasNext()) {
-                final Traverser.Admin<Object> traverser = iterator.next();
-                if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
-                    iterator.remove();
-                    traverser.attach(Attachable.Method.get(vertex));
-                    traverser.setSideEffects(traversalSideEffects);
-                    toProcessTraversers.add(traverser);
+            if (!maybeActiveTraversers.isEmpty()) {
+                final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
+                while (iterator.hasNext()) {
+                    final Traverser.Admin<Object> traverser = iterator.next();
+                    if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
+                        iterator.remove();
+                        traverser.attach(Attachable.Method.get(vertex));
+                        traverser.setSideEffects(traversalSideEffects);
+                        toProcessTraversers.add(traverser);
+                    }
                 }
             }
         }
+
+        // WORKER ACTIVE
         // these are traversers that exist from from a local barrier
         // these traversers will simply saved at the local vertex while the master traversal synchronized the barrier
         vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
@@ -98,17 +104,20 @@ final class WorkerExecutor {
             // remove the property to save space
             vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
         });
+
+        // TRAVERSER MESSAGES (WORKER -> WORKER)
         // these are traversers that have been messaged to the vertex from another vertex
         final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
         while (messages.hasNext()) {
             IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
                 if (traverser.isHalted()) {
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                     else
-                        haltedTraversers.add(traverser.detach());
+                        haltedTraversers.add(traverser); // the traverser has already been detached so no need to detach it again
                 } else {
                     // traverser is not halted and thus, should be processed locally
+                    // attach it and process
                     traverser.attach(Attachable.Method.get(vertex));
                     traverser.setSideEffects(traversalSideEffects);
                     toProcessTraversers.add(traverser);
@@ -120,7 +129,7 @@ final class WorkerExecutor {
         // PROCESS LOCAL TRAVERSERS //
         //////////////////////////////
 
-        // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
+        // while there are still local traversers, process them until they leave the vertex (message pass) or halt (store).
         while (!toProcessTraversers.isEmpty()) {
             Step<Object, Object> previousStep = EmptyStep.instance();
             Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
@@ -130,11 +139,11 @@ final class WorkerExecutor {
                 final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
                 // try and fill up the current step as much as possible with traversers to get a bulking optimization
                 if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
-                    WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
+                    WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
                 currentStep.addStart(traverser);
                 previousStep = currentStep;
             }
-            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
+            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
             // all processed traversers should be either halted or active
             assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store locally again
@@ -151,7 +160,7 @@ final class WorkerExecutor {
                             voteToHalt.set(false); // if message is passed, then don't vote to halt
                             messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach()));
                         } else {
-                            traverser.attach(Attachable.Method.get(vertex));
+                            traverser.attach(Attachable.Method.get(vertex)); // necessary for select() steps that reference the current object
                             toProcessTraversers.add(traverser);
                         }
                     } else                                                                              // STANDARD OBJECT
@@ -169,7 +178,7 @@ final class WorkerExecutor {
                                   final TraverserSet<Object> haltedTraversers,
                                   final Memory memory,
                                   final boolean returnHaltedTraversers,
-                                  final Class haltedTraverserFactory) {
+                                  final HaltedTraverserStrategy haltedTraverserStrategy) {
         if (step instanceof Barrier) {
             if (step instanceof Bypassing)
                 ((Bypassing) step).setBypass(true);
@@ -187,7 +196,7 @@ final class WorkerExecutor {
                                         (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                         getHostingVertex(traverser.get()).equals(vertex))) {
                             if (returnHaltedTraversers)
-                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
+                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                             else
                                 haltedTraversers.add(traverser.detach());
                         } else
@@ -205,12 +214,12 @@ final class WorkerExecutor {
         } else { // LOCAL PROCESSING
             step.forEachRemaining(traverser -> {
                 if (traverser.isHalted() &&
-                        // if its a ReferenceFactory (one less iteration)
-                        ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserFactory) &&
+                        // if its a ReferenceFactory (one less iteration required)
+                        ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserStrategy.getHaltedTraverserFactory()) &&
                                 (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                 getHostingVertex(traverser.get()).equals(vertex))) {
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                     else
                         haltedTraversers.add(traverser.detach());
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
index fd07e23..968e167 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorat
 
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
@@ -33,7 +34,10 @@ public final class HaltedTraverserStrategy extends AbstractTraversalStrategy<Tra
     private final Class haltedTraverserFactory;
 
     private HaltedTraverserStrategy(final Class haltedTraverserFactory) {
-        this.haltedTraverserFactory = haltedTraverserFactory;
+        if (haltedTraverserFactory.equals(DetachedFactory.class) || haltedTraverserFactory.equals(ReferenceFactory.class))
+            this.haltedTraverserFactory = haltedTraverserFactory;
+        else
+            throw new IllegalArgumentException("The provided traverser detachment factory is unknown: " + haltedTraverserFactory);
     }
 
     public void apply(final Traversal.Admin<?, ?> traversal) {
@@ -44,6 +48,16 @@ public final class HaltedTraverserStrategy extends AbstractTraversalStrategy<Tra
         return this.haltedTraverserFactory;
     }
 
+    public <R> Traverser.Admin<R> halt(final Traverser.Admin<R> traverser) {
+        if (ReferenceFactory.class.equals(this.haltedTraverserFactory))
+            traverser.set(ReferenceFactory.detach(traverser.get()));
+        else
+            traverser.set(DetachedFactory.detach(traverser.get(), true));
+        return traverser;
+    }
+
+    ////////////
+
     public static HaltedTraverserStrategy detached() {
         return new HaltedTraverserStrategy(DetachedFactory.class);
     }
@@ -51,4 +65,5 @@ public final class HaltedTraverserStrategy extends AbstractTraversalStrategy<Tra
     public static HaltedTraverserStrategy reference() {
         return new HaltedTraverserStrategy(ReferenceFactory.class);
     }
+
 }