You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/26 13:38:18 UTC

incubator-tinkerpop git commit: Moved MasterExecutor.detach() to HaltedTraverserStrategy.halt() as this is now generally useful for OLTP and not just OLAP. Lots more nick nack cleanups and comments.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1310 f78d1cb63 -> 56c43ea7b


Moved MasterExecutor.detach() to HaltedTraverserStrategy.halt() as this is now generally useful for OLTP and not just OLAP. Lots more nick nack cleanups and comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/56c43ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/56c43ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/56c43ea7

Branch: refs/heads/TINKERPOP-1310
Commit: 56c43ea7bd89e0ef72b958400d7bd83a10ad6a15
Parents: f78d1cb
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 26 07:38:12 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 26 07:38:12 2016 -0600

----------------------------------------------------------------------
 .../computer/traversal/MasterExecutor.java      | 21 ++-------
 .../traversal/TraversalVertexProgram.java       | 21 +++++----
 .../computer/traversal/WorkerExecutor.java      | 49 ++++++++++++--------
 .../decoration/HaltedTraverserStrategy.java     | 17 ++++++-
 4 files changed, 61 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
index 1c1e9d2..0462950 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MasterExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.HaltedTraverserStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -57,18 +58,6 @@ final class MasterExecutor {
 
     }
 
-    // the halted traversers can either be reference or detached elements -- this is good for determining how much data around the element the user wants to get
-    // see HaltedTraverserFactoryStrategy for how this is all connected
-    protected static <R> Traverser.Admin<R> detach(final Traverser.Admin<R> traverser, final Class haltedTraverserFactory) {
-        if (haltedTraverserFactory.equals(DetachedFactory.class))
-            traverser.set(DetachedFactory.detach(traverser.get(), true));
-        else if (haltedTraverserFactory.equals(ReferenceFactory.class))
-            traverser.set(ReferenceFactory.detach(traverser.get()));
-        else
-            throw new IllegalArgumentException("The following detaching factory is unknown: " + haltedTraverserFactory);
-        return traverser;
-    }
-
     protected static void processMemory(final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final TraverserSet<Object> toProcessTraversers, final Set<String> completedBarriers) {
         // handle traversers and data that were sent from the workers to the master traversal via memory
         if (memory.exists(TraversalVertexProgram.MUTATED_MEMORY_KEYS)) {
@@ -94,7 +83,7 @@ final class MasterExecutor {
                                             TraverserSet<Object> toProcessTraversers,
                                             final TraverserSet<Object> remoteActiveTraversers,
                                             final TraverserSet<Object> haltedTraversers,
-                                            final Class haltedTraverserFactory) {
+                                            final HaltedTraverserStrategy haltedTraverserStrategy) {
 
         while (!toProcessTraversers.isEmpty()) {
             final TraverserSet<Object> localActiveTraversers = new TraverserSet<>();
@@ -109,7 +98,7 @@ final class MasterExecutor {
                 traverser.set(DetachedFactory.detach(traverser.get(), true)); // why?
                 traverser.setSideEffects(traversal.get().getSideEffects());
                 if (traverser.isHalted())
-                    haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
+                    haltedTraversers.add(haltedTraverserStrategy.halt(traverser));
                 else if (isRemoteTraverser(traverser, traversalMatrix))  // this is so that patterns like order().name work as expected. try and stay local as long as possible
                     remoteActiveTraversers.add(traverser.detach());
                 else {
@@ -118,7 +107,7 @@ final class MasterExecutor {
                         while (previousStep.hasNext()) {
                             final Traverser.Admin<Object> result = previousStep.next();
                             if (result.isHalted())
-                                haltedTraversers.add(MasterExecutor.detach(result, haltedTraverserFactory));
+                                haltedTraversers.add(haltedTraverserStrategy.halt(traverser));
                             else if (isRemoteTraverser(result, traversalMatrix))
                                 remoteActiveTraversers.add(result.detach());
                             else
@@ -133,7 +122,7 @@ final class MasterExecutor {
                 while (currentStep.hasNext()) {
                     final Traverser.Admin<Object> traverser = currentStep.next();
                     if (traverser.isHalted())
-                        haltedTraversers.add(MasterExecutor.detach(traverser, haltedTraverserFactory));
+                        haltedTraversers.add(haltedTraverserStrategy.halt(traverser));
                     else if (isRemoteTraverser(traverser, traversalMatrix))
                         remoteActiveTraversers.add(traverser.detach());
                     else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 07ab98a..c9dc053 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -109,7 +109,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     private final Set<MapReduce> mapReducers = new HashSet<>();
     private TraverserSet<Object> haltedTraversers;
     private boolean returnHaltedTraversers = false;
-    private Class haltedTraverserDetachFactory;
+    private HaltedTraverserStrategy haltedTraverserStrategy;
 
     private TraversalVertexProgram() {
     }
@@ -167,11 +167,11 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         (this.traversal.get().getParent().asStep().getNextStep() instanceof ProfileStep && // same as above, but needed for profiling
                                 this.traversal.get().getParent().asStep().getNextStep().getNextStep() instanceof ComputerResultStep));
         // determine how to store halted traversers
-        this.haltedTraverserDetachFactory = ((HaltedTraverserStrategy) this.traversal.get().getStrategies().toList()
+        this.haltedTraverserStrategy = ((HaltedTraverserStrategy) this.traversal.get().getStrategies().toList()
                 .stream()
                 .filter(strategy -> strategy instanceof HaltedTraverserStrategy)
                 .findAny()
-                .orElse(HaltedTraverserStrategy.reference())).getHaltedTraverserFactory();
+                .orElse(HaltedTraverserStrategy.reference()));
         // register traversal side-effects in memory
         this.memoryComputeKeys.addAll(MemoryTraversalSideEffects.getMemoryComputeKeys(this.traversal.get()));
         // register MapReducer memory compute keys
@@ -217,9 +217,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 traverser.setStepId(this.traversal.get().getStartStep().getId());
                 toProcessTraversers.add(traverser);
             });
-            assert haltedTraversers.isEmpty();
+            assert this.haltedTraversers.isEmpty();
             final TraverserSet<Object> remoteActiveTraversers = new TraverserSet<>();
-            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, this.haltedTraversers, this.haltedTraverserDetachFactory);
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, this.haltedTraversers, this.haltedTraverserStrategy);
             memory.set(HALTED_TRAVERSERS, this.haltedTraversers);
             memory.set(ACTIVE_TRAVERSERS, remoteActiveTraversers);
         } else {
@@ -238,6 +238,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final Memory memory) {
         // if any global halted traversers, simply don't use them as they were handled by master setup()
+        // these halted traversers are typically from a previous OLAP job that yielded traversers at the master traversal
         if (null != this.haltedTraversers)
             this.haltedTraversers = null;
         // memory is distributed
@@ -274,16 +275,16 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 graphStep.forEachRemaining(traverser -> {
                     if (traverser.isHalted()) {
                         if (this.returnHaltedTraversers)
-                            memory.add(HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, this.haltedTraverserDetachFactory)));
+                            memory.add(HALTED_TRAVERSERS, new TraverserSet<>(this.haltedTraverserStrategy.halt(traverser)));
                         else
                             haltedTraversers.add((Traverser.Admin) traverser.detach());
                     } else
                         activeTraversers.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserDetachFactory));
+            memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserStrategy));
         } else   // ITERATION 1+
-            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserDetachFactory));
+            memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers, this.haltedTraverserStrategy));
         // save space by not having an empty halted traversers property
         if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
             vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
@@ -307,7 +308,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             final Set<String> completedBarriers = new HashSet<>();
             MasterExecutor.processMemory(this.traversalMatrix, memory, toProcessTraversers, completedBarriers);
             // process all results from barriers locally and when elements are touched, put them in remoteActiveTraversers
-            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers, this.haltedTraverserDetachFactory);
+            MasterExecutor.processTraversers(this.traversal, this.traversalMatrix, toProcessTraversers, remoteActiveTraversers, haltedTraversers, this.haltedTraverserStrategy);
             // tell parallel barriers that might not have been active in the last round that they are no longer active
             memory.set(COMPLETED_BARRIERS, completedBarriers);
             if (!remoteActiveTraversers.isEmpty() ||
@@ -319,7 +320,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                 // finalize locally with any last traversers dangling in the local traversal
                 final Step<?, Object> endStep = (Step<?, Object>) this.traversal.get().getEndStep();
                 while (endStep.hasNext()) {
-                    haltedTraversers.add(MasterExecutor.detach(endStep.next(), this.haltedTraverserDetachFactory));
+                    haltedTraversers.add(this.haltedTraverserStrategy.halt(endStep.next()));
                 }
                 // the result of a TraversalVertexProgram are the halted traversers
                 memory.set(HALTED_TRAVERSERS, haltedTraversers);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
index 5798af0..7181992 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.HaltedTraverserStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -57,7 +58,7 @@ final class WorkerExecutor {
                                      final TraversalMatrix<?, ?> traversalMatrix,
                                      final Memory memory,
                                      final boolean returnHaltedTraversers,
-                                     final Class haltedTraverserFactory) {
+                                     final HaltedTraverserStrategy haltedTraverserStrategy) {
         final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
@@ -68,6 +69,7 @@ final class WorkerExecutor {
         // GENERATE LOCAL TRAVERSERS //
         ///////////////////////////////
 
+        // MASTER ACTIVE
         // these are traversers that are going from OLTP (master) to OLAP (workers)
         // these traversers were broadcasted from the master traversal to the workers for attachment
         final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
@@ -75,17 +77,21 @@ final class WorkerExecutor {
         // its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
         // most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing.
         synchronized (maybeActiveTraversers) {
-            final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
-            while (iterator.hasNext()) {
-                final Traverser.Admin<Object> traverser = iterator.next();
-                if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
-                    iterator.remove();
-                    traverser.attach(Attachable.Method.get(vertex));
-                    traverser.setSideEffects(traversalSideEffects);
-                    toProcessTraversers.add(traverser);
+            if (!maybeActiveTraversers.isEmpty()) {
+                final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
+                while (iterator.hasNext()) {
+                    final Traverser.Admin<Object> traverser = iterator.next();
+                    if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
+                        iterator.remove();
+                        traverser.attach(Attachable.Method.get(vertex));
+                        traverser.setSideEffects(traversalSideEffects);
+                        toProcessTraversers.add(traverser);
+                    }
                 }
             }
         }
+
+        // WORKER ACTIVE
         // these are traversers that exist from from a local barrier
         // these traversers will simply saved at the local vertex while the master traversal synchronized the barrier
         vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
@@ -98,17 +104,20 @@ final class WorkerExecutor {
             // remove the property to save space
             vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
         });
+
+        // TRAVERSER MESSAGES (WORKER -> WORKER)
         // these are traversers that have been messaged to the vertex from another vertex
         final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
         while (messages.hasNext()) {
             IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
                 if (traverser.isHalted()) {
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                     else
-                        haltedTraversers.add(traverser.detach());
+                        haltedTraversers.add(traverser); // the traverser has already been detached so no need to detach it again
                 } else {
                     // traverser is not halted and thus, should be processed locally
+                    // attach it and process
                     traverser.attach(Attachable.Method.get(vertex));
                     traverser.setSideEffects(traversalSideEffects);
                     toProcessTraversers.add(traverser);
@@ -120,7 +129,7 @@ final class WorkerExecutor {
         // PROCESS LOCAL TRAVERSERS //
         //////////////////////////////
 
-        // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
+        // while there are still local traversers, process them until they leave the vertex (message pass) or halt (store).
         while (!toProcessTraversers.isEmpty()) {
             Step<Object, Object> previousStep = EmptyStep.instance();
             Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
@@ -130,11 +139,11 @@ final class WorkerExecutor {
                 final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
                 // try and fill up the current step as much as possible with traversers to get a bulking optimization
                 if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
-                    WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
+                    WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
                 currentStep.addStart(traverser);
                 previousStep = currentStep;
             }
-            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserFactory);
+            WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
             // all processed traversers should be either halted or active
             assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store locally again
@@ -151,7 +160,7 @@ final class WorkerExecutor {
                             voteToHalt.set(false); // if message is passed, then don't vote to halt
                             messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach()));
                         } else {
-                            traverser.attach(Attachable.Method.get(vertex));
+                            traverser.attach(Attachable.Method.get(vertex)); // necessary for select() steps that reference the current object
                             toProcessTraversers.add(traverser);
                         }
                     } else                                                                              // STANDARD OBJECT
@@ -169,7 +178,7 @@ final class WorkerExecutor {
                                   final TraverserSet<Object> haltedTraversers,
                                   final Memory memory,
                                   final boolean returnHaltedTraversers,
-                                  final Class haltedTraverserFactory) {
+                                  final HaltedTraverserStrategy haltedTraverserStrategy) {
         if (step instanceof Barrier) {
             if (step instanceof Bypassing)
                 ((Bypassing) step).setBypass(true);
@@ -187,7 +196,7 @@ final class WorkerExecutor {
                                         (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                         getHostingVertex(traverser.get()).equals(vertex))) {
                             if (returnHaltedTraversers)
-                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
+                                memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                             else
                                 haltedTraversers.add(traverser.detach());
                         } else
@@ -205,12 +214,12 @@ final class WorkerExecutor {
         } else { // LOCAL PROCESSING
             step.forEachRemaining(traverser -> {
                 if (traverser.isHalted() &&
-                        // if its a ReferenceFactory (one less iteration)
-                        ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserFactory) &&
+                        // if its a ReferenceFactory (one less iteration required)
+                        ((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserStrategy.getHaltedTraverserFactory()) &&
                                 (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
                                 getHostingVertex(traverser.get()).equals(vertex))) {
                     if (returnHaltedTraversers)
-                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(MasterExecutor.detach(traverser, haltedTraverserFactory)));
+                        memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
                     else
                         haltedTraversers.add(traverser.detach());
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/56c43ea7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
index fd07e23..968e167 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/HaltedTraverserStrategy.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorat
 
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
 import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
@@ -33,7 +34,10 @@ public final class HaltedTraverserStrategy extends AbstractTraversalStrategy<Tra
     private final Class haltedTraverserFactory;
 
     private HaltedTraverserStrategy(final Class haltedTraverserFactory) {
-        this.haltedTraverserFactory = haltedTraverserFactory;
+        if (haltedTraverserFactory.equals(DetachedFactory.class) || haltedTraverserFactory.equals(ReferenceFactory.class))
+            this.haltedTraverserFactory = haltedTraverserFactory;
+        else
+            throw new IllegalArgumentException("The provided traverser detachment factory is unknown: " + haltedTraverserFactory);
     }
 
     public void apply(final Traversal.Admin<?, ?> traversal) {
@@ -44,6 +48,16 @@ public final class HaltedTraverserStrategy extends AbstractTraversalStrategy<Tra
         return this.haltedTraverserFactory;
     }
 
+    public <R> Traverser.Admin<R> halt(final Traverser.Admin<R> traverser) {
+        if (ReferenceFactory.class.equals(this.haltedTraverserFactory))
+            traverser.set(ReferenceFactory.detach(traverser.get()));
+        else
+            traverser.set(DetachedFactory.detach(traverser.get(), true));
+        return traverser;
+    }
+
+    ////////////
+
     public static HaltedTraverserStrategy detached() {
         return new HaltedTraverserStrategy(DetachedFactory.class);
     }
@@ -51,4 +65,5 @@ public final class HaltedTraverserStrategy extends AbstractTraversalStrategy<Tra
     public static HaltedTraverserStrategy reference() {
         return new HaltedTraverserStrategy(ReferenceFactory.class);
     }
+
 }