You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/25 21:15:39 UTC

tinkerpop git commit: removed the need for the Terminate token. Added a new test case to GraphActorsTest that verifies that message priority is respected. Other minor tweaks.

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 d013d4052 -> a406a8827


removed the need for the Terminate token. Added a new test case to GraphActorsTest that verifies that message priority is respected. Other minor tweaks.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a406a882
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a406a882
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a406a882

Branch: refs/heads/TINKERPOP-1564
Commit: a406a882784291575019f991432f0d9ed5106b78
Parents: d013d40
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 25 14:15:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 25 14:15:35 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/ActorMailbox.java       |   2 +-
 .../akka/process/actors/AkkaConfigFactory.java  |   2 +-
 .../process/actors/io/gryo/GryoSerializer.java  |  22 ++--
 .../akka/process/actors/AkkaActorsProvider.java |   4 +-
 .../tinkerpop/gremlin/process/actors/Actor.java |   8 ++
 .../actors/traversal/TraversalActorProgram.java |   8 +-
 .../traversal/TraversalMasterProgram.java       |  23 ++--
 .../traversal/TraversalWorkerProgram.java       |  10 +-
 .../actors/traversal/message/Terminate.java     |  28 -----
 .../gremlin/process/actors/GraphActorsTest.java |   9 ++
 .../TestMessagePrioritiesActorProgram.java      | 125 +++++++++++++++++++
 11 files changed, 172 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
index 8505871..2a99a39 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
@@ -146,7 +146,7 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
         for (final List<String> message : messages) {
             this.messageTypes.put(
                     ClassUtil.getClassOrEnum(message.get(0)),
-                    Object.class.getCanonicalName().equals(message.get(1)) ?
+                    null == message.get(1) ?
                             null :
                             ClassUtil.getClassOrEnum(message.get(1)));
         }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
index 1422e07..e76562e 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
@@ -58,7 +58,7 @@ final class AkkaConfigFactory {
                                 stream().
                                 map(entry -> Arrays.asList(
                                         ClassUtil.getClassName(entry.getKey()),
-                                        ClassUtil.getClassName(null == entry.getValue() ? Object.class : entry.getValue().getClass()))).
+                                        null == entry.getValue() ? null : ClassUtil.getClassName(entry.getValue().getClass()))).
                                 collect(Collectors.toList())));
         final Iterator<String> keys = configuration.getKeys();
         while (keys.hasNext()) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
index 8d567ba..968b0dc 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
@@ -63,14 +63,6 @@ public final class GryoSerializer implements Serializer {
         }
         // remove Gryo 3.0 classes
         GryoVersion.V3_0.getRegistrations().forEach(type -> gryoClasses.remove(type.getTargetClass()));
-        // this sucks. how to do this automatically?
-        gryoClasses.remove(Short.class);
-        gryoClasses.remove(Integer.class);
-        gryoClasses.remove(Float.class);
-        gryoClasses.remove(Double.class);
-        gryoClasses.remove(Long.class);
-        gryoClasses.remove(String.class);
-        //
         final List<IoRegistry> registryList;
         if (config.hasPath(IoRegistry.IO_REGISTRY)) {
             final Configuration configuration = new BaseConfiguration();
@@ -82,7 +74,7 @@ public final class GryoSerializer implements Serializer {
         this.gryoPool = GryoPool.build().
                 poolSize(10).
                 initializeMapper(builder ->
-                        builder.referenceTracking(true). // config.getBoolean("gremlin.gryo.referenceTracking")).
+                        builder.referenceTracking(true).    // config.getBoolean("gremlin.gryo.referenceTracking")).
                                 registrationRequired(true). // config.getBoolean("gremlin.gryo.registrationRequired")).
                                 version(GryoVersion.V3_0).
                                 addCustom(gryoClasses.toArray(new Class[gryoClasses.size()])).
@@ -91,8 +83,18 @@ public final class GryoSerializer implements Serializer {
 
     public static Map<String, String> getSerializerBindings(final ActorProgram<?> actorProgram, final Configuration configuration) {
         final Set<Class> programMessageClasses = new HashSet<>(actorProgram.getMessageTypes().keySet());
-        programMessageClasses.add(DefaultActorsResult.class); // todo: may make this a Gryo3.0 class in the near future
         GryoVersion.V3_0.getRegistrations().forEach(type -> programMessageClasses.remove(type.getTargetClass()));
+        // this sucks. how to do this automatically?
+        programMessageClasses.add(DefaultActorsResult.class); // todo: may make this a Gryo3.0 class in the near future
+        programMessageClasses.remove(Short.class);
+        programMessageClasses.remove(Integer.class);
+        programMessageClasses.remove(Float.class);
+        programMessageClasses.remove(Double.class);
+        programMessageClasses.remove(Long.class);
+        programMessageClasses.remove(String.class);
+        programMessageClasses.remove(Boolean.class);
+        programMessageClasses.remove(Character.class);
+        //
         final Map<String, String> bindings = new HashMap<>();
         GryoMapper.build().
                 referenceTracking(configuration.getBoolean("gremlin.gryo.referenceTracking", true)).

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
index c76db50..09ebde9 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -70,8 +70,8 @@ public class AkkaActorsProvider extends AbstractTinkerGraphProvider {
         final Map<String, Object> configuration = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
         // Akka specific configuration
         configuration.put(Constants.AKKA_LOG_DEAD_LETTERS_DURING_SHUTDOWN, false);
-        configuration.put(Constants.AKKA_ACTOR_PROVIDER, "remote");
-        configuration.put(Constants.AKKA_ACTOR_SERIALIZE_MESSAGES, "on");
+        configuration.put(Constants.AKKA_ACTOR_PROVIDER, RANDOM.nextBoolean() ? "remote" : "local");
+        configuration.put(Constants.AKKA_ACTOR_SERIALIZE_MESSAGES, RANDOM.nextBoolean() ? "on" : "off");
         configuration.put(Constants.AKKA_ACTOR_SERIALIZERS_GRYO, GryoSerializer.class.getCanonicalName());
         configuration.put(Constants.AKKA_REMOTE_ENABLED_TRANSPORTS, Collections.singletonList("akka.remote.netty.tcp"));
         configuration.put(Constants.AKKA_REMOTE_NETTY_TCP_HOSTNAME, "127.0.0.1");

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
index c47e928..d9c1b1f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -64,6 +64,14 @@ public interface Actor {
      */
     public void close();
 
+    /**
+     * A helper method that will broadcast the provided message to all workers.
+     * The default implementation simply loops through {@link Actor#workers()}
+     * and sends the message.
+     *
+     * @param message the message to broadcast to all workers
+     * @param <M>     the message type
+     */
     public default <M> void broadcast(final M message) {
         for (final Address.Worker worker : this.workers()) {
             this.send(worker, message);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index 3f23849..aea8b3c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -27,7 +27,6 @@ import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddM
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification.ActorVerificationStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
@@ -51,7 +50,6 @@ import org.javatuples.Pair;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.BinaryOperator;
 
 /**
@@ -66,12 +64,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<Pair<Travers
     private static final Map<Class, BinaryOperator> MESSAGES = new LinkedHashMap<>();
 
     static {
-        MESSAGES.put(BarrierDoneMessage.class, null);
         MESSAGES.put(Traverser.class, null);
         MESSAGES.put(SideEffectAddMessage.class, null);
-        MESSAGES.put(BarrierAddMessage.class, null);
         MESSAGES.put(SideEffectSetMessage.class, null);
-        MESSAGES.put(Terminate.class, null);
+        MESSAGES.put(BarrierAddMessage.class, null);
+        MESSAGES.put(BarrierDoneMessage.class, null);
+        MESSAGES.put(Boolean.class, null);
     }
 
     private Traversal.Admin<?, R> traversal;

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 25a29a5..8fc0ca9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddM
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -88,7 +87,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
         }
         // first pass of a two pass termination detection
         this.voteToHalt = false;
-        this.master.send(this.neighborAddress, Terminate.NO);
+        this.master.send(this.neighborAddress, Boolean.FALSE);
     }
 
     @Override
@@ -125,22 +124,22 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
             final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message;
             this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), TraversalActorProgram.attach(sideEffectAddMessage.getValue(), this.master.partitioner().getGraph()));
             this.sideEffects.add(sideEffectAddMessage.getKey());
-        } else if (message instanceof Terminate) {
-            if (message == Terminate.NO)
+        } else if (message instanceof Boolean) {
+            if (!(Boolean) message)
                 this.voteToHalt = false;
             if (this.voteToHalt && !this.sideEffects.isEmpty()) {
                 // process all side-effect updates
                 for (final String key : this.sideEffects) {
-                    this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
+                    this.master.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
                 }
                 this.sideEffects.clear();
                 this.voteToHalt = false;
-                this.master.send(this.neighborAddress, Terminate.NO);
+                this.master.send(this.neighborAddress, Boolean.FALSE);
             } else if (this.voteToHalt && !this.barriers.isEmpty()) {
                 if (!this.barriersDone) {
                     // tell all workers that the barrier is complete
                     for (final Barrier barrier : this.barriers.values()) {
-                        this.broadcast(new BarrierDoneMessage(barrier));
+                        this.master.broadcast(new BarrierDoneMessage(barrier));
                     }
                     this.barriersDone = true;
                 } else {
@@ -163,10 +162,10 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
                     this.barriersDone = false;
                 }
                 this.voteToHalt = false;
-                this.master.send(this.neighborAddress, Terminate.NO);
+                this.master.send(this.neighborAddress, Boolean.FALSE);
             } else if (!this.voteToHalt) {
                 this.voteToHalt = true;
-                this.master.send(this.neighborAddress, Terminate.YES);
+                this.master.send(this.neighborAddress, Boolean.TRUE);
             } else {
                 // get any dangling local results (e.g. workers have no data but a reducing barrier is waiting for data)
                 while (this.traversal.hasNext()) {
@@ -196,12 +195,6 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
 
     }
 
-    private void broadcast(final Object message) {
-        for (final Address.Worker worker : this.master.workers()) {
-            this.master.send(worker, message);
-        }
-    }
-
     private void sendTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted()) {
             TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index f3edb28..8ea48c3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.process.actors.Address;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -128,9 +127,8 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
                 }
             } else
                 ((Barrier) step).done();       // the master drains the global barrier
-        } else if (message instanceof Terminate) {
+        } else if (message instanceof Boolean) {
             ////////// DETERMINE TERMINATION CONDITION //////////
-            final Terminate terminate = (Terminate) message;
             if (this.voteToHalt && !this.barriers.isEmpty()) {
                 for (final Barrier barrier : this.barriers.values()) {
                     if (barrier instanceof LocalBarrier) {
@@ -146,7 +144,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
                 this.voteToHalt = false;
             }
             // use termination token to determine termination condition
-            this.worker.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
+            this.worker.send(this.neighborAddress, this.voteToHalt ? message : Boolean.FALSE);
             this.voteToHalt = true;
         } else {
             throw new IllegalArgumentException("The following message is unknown: " + message);
@@ -168,9 +166,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
             this.worker.send(this.worker.master(), traverser);
         } else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
             // mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
-            for (final Address.Worker worker : this.worker.workers()) {
-                this.worker.send(worker, traverser);
-            }
+            this.worker.broadcast(traverser);
         } else if (traverser.get() instanceof Element && !this.worker.partition().contains((Element) traverser.get())) {
             // if the traverser references a non-local element, send the traverser to the appropriate worker/partition
             this.worker.send(this.partitionToWorkerMap.get(this.worker.partition().partitioner().find((Element) traverser.get())), traverser);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
deleted file mode 100644
index a4bde19..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public enum Terminate {
-
-    YES, NO
-}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index 149872a..d5fe24e 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -88,10 +88,19 @@ public class GraphActorsTest extends AbstractGremlinProcessTest {
     }
 
     @Test
+    @Ignore
     public void shouldSupportMessageCombiners() throws Exception {
         for (int i = 1; i < 10; i++) {
             final GraphActors actors = graphProvider.getGraphActors(graph);
             actors.workers(i).program(new TestMessageCombinersActorProgram()).submit(graph).get();
         }
     }
+
+    @Test
+    public void shouldSupportMessagePriorities() throws Exception {
+        for (int i = 1; i < 10; i++) {
+            final GraphActors actors = graphProvider.getGraphActors(graph);
+            actors.workers(i).program(new TestMessagePrioritiesActorProgram()).submit(graph).get();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java
new file mode 100644
index 0000000..fe4c452
--- /dev/null
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java
@@ -0,0 +1,125 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TestMessagePrioritiesActorProgram implements ActorProgram {
+
+    @Override
+    public Map<Class, BinaryOperator> getMessageTypes() {
+        final Map<Class, BinaryOperator> messages = new LinkedHashMap<>();
+        messages.put(Integer.class, null);
+        messages.put(Double.class, null);
+        messages.put(Long.class, null);
+        return messages;
+    }
+
+    @Override
+    public Worker createWorkerProgram(final Actor.Worker worker) {
+        return new Worker<Number>() {
+
+            private int longCount = 0;
+
+            @Override
+            public void setup() {
+                worker.send(worker.master(), 2.0d);
+                worker.send(worker.master(), 1);
+            }
+
+            @Override
+            public void execute(final Number message) {
+                assertEquals(5L, message);
+                assertEquals(0, this.longCount);
+                this.longCount++;
+                worker.send(worker.master(), message);
+            }
+
+            @Override
+            public void terminate() {
+                assertEquals(1, this.longCount);
+            }
+        };
+    }
+
+    @Override
+    public Master createMasterProgram(final Actor.Master master) {
+
+        return new Master<Number>() {
+
+            private int doubleCount = 0;
+            private int integerCount = 0;
+            private int longCount = 0;
+
+
+            @Override
+            public void setup() {
+                try {
+                    Thread.sleep(10);
+                } catch (Exception e) {
+
+                }
+            }
+
+            @Override
+            public void execute(final Number message) {
+                if (message instanceof Integer) {
+                    assertEquals(1, message);
+                    assertEquals(0, this.doubleCount);
+                    assertEquals(0, this.longCount);
+                    this.integerCount++;
+                } else if (message instanceof Double) {
+                    assertEquals(2.0d, message);
+                    assertEquals(master.workers().size(), this.integerCount);
+                    assertEquals(0, this.longCount);
+                    if (this.doubleCount == 0)
+                        master.broadcast(5L);
+                    this.doubleCount++;
+                } else if (message instanceof Long) {
+                    assertEquals(5L, message);
+                    assertEquals(master.workers().size(), this.integerCount);
+                    assertEquals(master.workers().size(), this.doubleCount);
+                    if (++this.longCount == master.workers().size())
+                        master.close();
+                }
+            }
+
+            @Override
+            public void terminate() {
+                assertEquals(master.workers().size(), this.integerCount);
+                assertEquals(master.workers().size(), this.doubleCount);
+                assertEquals(master.workers().size(), this.longCount);
+                master.setResult(null);
+            }
+        };
+    }
+
+    @Override
+    public ActorProgram clone() {
+        return this;
+    }
+}