You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/25 21:15:39 UTC
tinkerpop git commit: removed the need for the Terminate token. Added
a new test case to GraphActorsTest that verifies that message priority is
respected. Other minor tweaks.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 d013d4052 -> a406a8827
removed the need for the Terminate token. Added a new test case to GraphActorsTest that verifies that message priority is respected. Other minor tweaks.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a406a882
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a406a882
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a406a882
Branch: refs/heads/TINKERPOP-1564
Commit: a406a882784291575019f991432f0d9ed5106b78
Parents: d013d40
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 25 14:15:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 25 14:15:35 2017 -0700
----------------------------------------------------------------------
.../akka/process/actors/ActorMailbox.java | 2 +-
.../akka/process/actors/AkkaConfigFactory.java | 2 +-
.../process/actors/io/gryo/GryoSerializer.java | 22 ++--
.../akka/process/actors/AkkaActorsProvider.java | 4 +-
.../tinkerpop/gremlin/process/actors/Actor.java | 8 ++
.../actors/traversal/TraversalActorProgram.java | 8 +-
.../traversal/TraversalMasterProgram.java | 23 ++--
.../traversal/TraversalWorkerProgram.java | 10 +-
.../actors/traversal/message/Terminate.java | 28 -----
.../gremlin/process/actors/GraphActorsTest.java | 9 ++
.../TestMessagePrioritiesActorProgram.java | 125 +++++++++++++++++++
11 files changed, 172 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
index 8505871..2a99a39 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
@@ -146,7 +146,7 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
for (final List<String> message : messages) {
this.messageTypes.put(
ClassUtil.getClassOrEnum(message.get(0)),
- Object.class.getCanonicalName().equals(message.get(1)) ?
+ null == message.get(1) ?
null :
ClassUtil.getClassOrEnum(message.get(1)));
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
index 1422e07..e76562e 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
@@ -58,7 +58,7 @@ final class AkkaConfigFactory {
stream().
map(entry -> Arrays.asList(
ClassUtil.getClassName(entry.getKey()),
- ClassUtil.getClassName(null == entry.getValue() ? Object.class : entry.getValue().getClass()))).
+ null == entry.getValue() ? null : ClassUtil.getClassName(entry.getValue().getClass()))).
collect(Collectors.toList())));
final Iterator<String> keys = configuration.getKeys();
while (keys.hasNext()) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
index 8d567ba..968b0dc 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/gryo/GryoSerializer.java
@@ -63,14 +63,6 @@ public final class GryoSerializer implements Serializer {
}
// remove Gryo 3.0 classes
GryoVersion.V3_0.getRegistrations().forEach(type -> gryoClasses.remove(type.getTargetClass()));
- // this sucks. how to do this automatically?
- gryoClasses.remove(Short.class);
- gryoClasses.remove(Integer.class);
- gryoClasses.remove(Float.class);
- gryoClasses.remove(Double.class);
- gryoClasses.remove(Long.class);
- gryoClasses.remove(String.class);
- //
final List<IoRegistry> registryList;
if (config.hasPath(IoRegistry.IO_REGISTRY)) {
final Configuration configuration = new BaseConfiguration();
@@ -82,7 +74,7 @@ public final class GryoSerializer implements Serializer {
this.gryoPool = GryoPool.build().
poolSize(10).
initializeMapper(builder ->
- builder.referenceTracking(true). // config.getBoolean("gremlin.gryo.referenceTracking")).
+ builder.referenceTracking(true). // config.getBoolean("gremlin.gryo.referenceTracking")).
registrationRequired(true). // config.getBoolean("gremlin.gryo.registrationRequired")).
version(GryoVersion.V3_0).
addCustom(gryoClasses.toArray(new Class[gryoClasses.size()])).
@@ -91,8 +83,18 @@ public final class GryoSerializer implements Serializer {
public static Map<String, String> getSerializerBindings(final ActorProgram<?> actorProgram, final Configuration configuration) {
final Set<Class> programMessageClasses = new HashSet<>(actorProgram.getMessageTypes().keySet());
- programMessageClasses.add(DefaultActorsResult.class); // todo: may make this a Gryo3.0 class in the near future
GryoVersion.V3_0.getRegistrations().forEach(type -> programMessageClasses.remove(type.getTargetClass()));
+ // this sucks. how to do this automatically?
+ programMessageClasses.add(DefaultActorsResult.class); // todo: may make this a Gryo3.0 class in the near future
+ programMessageClasses.remove(Short.class);
+ programMessageClasses.remove(Integer.class);
+ programMessageClasses.remove(Float.class);
+ programMessageClasses.remove(Double.class);
+ programMessageClasses.remove(Long.class);
+ programMessageClasses.remove(String.class);
+ programMessageClasses.remove(Boolean.class);
+ programMessageClasses.remove(Character.class);
+ //
final Map<String, String> bindings = new HashMap<>();
GryoMapper.build().
referenceTracking(configuration.getBoolean("gremlin.gryo.referenceTracking", true)).
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
index c76db50..09ebde9 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -70,8 +70,8 @@ public class AkkaActorsProvider extends AbstractTinkerGraphProvider {
final Map<String, Object> configuration = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
// Akka specific configuration
configuration.put(Constants.AKKA_LOG_DEAD_LETTERS_DURING_SHUTDOWN, false);
- configuration.put(Constants.AKKA_ACTOR_PROVIDER, "remote");
- configuration.put(Constants.AKKA_ACTOR_SERIALIZE_MESSAGES, "on");
+ configuration.put(Constants.AKKA_ACTOR_PROVIDER, RANDOM.nextBoolean() ? "remote" : "local");
+ configuration.put(Constants.AKKA_ACTOR_SERIALIZE_MESSAGES, RANDOM.nextBoolean() ? "on" : "off");
configuration.put(Constants.AKKA_ACTOR_SERIALIZERS_GRYO, GryoSerializer.class.getCanonicalName());
configuration.put(Constants.AKKA_REMOTE_ENABLED_TRANSPORTS, Collections.singletonList("akka.remote.netty.tcp"));
configuration.put(Constants.AKKA_REMOTE_NETTY_TCP_HOSTNAME, "127.0.0.1");
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
index c47e928..d9c1b1f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -64,6 +64,14 @@ public interface Actor {
*/
public void close();
+ /**
+ * A helper method that will broadcast the provided message to all workers.
+ * The default implementation simply loops through {@link Actor#workers()}
+ * and sends the message.
+ *
+ * @param message the message to broadcast to all workers
+ * @param <M> the message type
+ */
public default <M> void broadcast(final M message) {
for (final Address.Worker worker : this.workers()) {
this.send(worker, message);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index 3f23849..aea8b3c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -27,7 +27,6 @@ import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddM
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy;
import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification.ActorVerificationStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
@@ -51,7 +50,6 @@ import org.javatuples.Pair;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.function.BinaryOperator;
/**
@@ -66,12 +64,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<Pair<Travers
private static final Map<Class, BinaryOperator> MESSAGES = new LinkedHashMap<>();
static {
- MESSAGES.put(BarrierDoneMessage.class, null);
MESSAGES.put(Traverser.class, null);
MESSAGES.put(SideEffectAddMessage.class, null);
- MESSAGES.put(BarrierAddMessage.class, null);
MESSAGES.put(SideEffectSetMessage.class, null);
- MESSAGES.put(Terminate.class, null);
+ MESSAGES.put(BarrierAddMessage.class, null);
+ MESSAGES.put(BarrierDoneMessage.class, null);
+ MESSAGES.put(Boolean.class, null);
}
private Traversal.Admin<?, R> traversal;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 25a29a5..8fc0ca9 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddM
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -88,7 +87,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
}
// first pass of a two pass termination detection
this.voteToHalt = false;
- this.master.send(this.neighborAddress, Terminate.NO);
+ this.master.send(this.neighborAddress, Boolean.FALSE);
}
@Override
@@ -125,22 +124,22 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
final SideEffectAddMessage sideEffectAddMessage = (SideEffectAddMessage) message;
this.traversal.getSideEffects().add(sideEffectAddMessage.getKey(), TraversalActorProgram.attach(sideEffectAddMessage.getValue(), this.master.partitioner().getGraph()));
this.sideEffects.add(sideEffectAddMessage.getKey());
- } else if (message instanceof Terminate) {
- if (message == Terminate.NO)
+ } else if (message instanceof Boolean) {
+ if (!(Boolean) message)
this.voteToHalt = false;
if (this.voteToHalt && !this.sideEffects.isEmpty()) {
// process all side-effect updates
for (final String key : this.sideEffects) {
- this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
+ this.master.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
}
this.sideEffects.clear();
this.voteToHalt = false;
- this.master.send(this.neighborAddress, Terminate.NO);
+ this.master.send(this.neighborAddress, Boolean.FALSE);
} else if (this.voteToHalt && !this.barriers.isEmpty()) {
if (!this.barriersDone) {
// tell all workers that the barrier is complete
for (final Barrier barrier : this.barriers.values()) {
- this.broadcast(new BarrierDoneMessage(barrier));
+ this.master.broadcast(new BarrierDoneMessage(barrier));
}
this.barriersDone = true;
} else {
@@ -163,10 +162,10 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
this.barriersDone = false;
}
this.voteToHalt = false;
- this.master.send(this.neighborAddress, Terminate.NO);
+ this.master.send(this.neighborAddress, Boolean.FALSE);
} else if (!this.voteToHalt) {
this.voteToHalt = true;
- this.master.send(this.neighborAddress, Terminate.YES);
+ this.master.send(this.neighborAddress, Boolean.TRUE);
} else {
// get any dangling local results (e.g. workers have no data but a reducing barrier is waiting for data)
while (this.traversal.hasNext()) {
@@ -196,12 +195,6 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
}
- private void broadcast(final Object message) {
- for (final Address.Worker worker : this.master.workers()) {
- this.master.send(worker, message);
- }
- }
-
private void sendTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted()) {
TraversalActorProgram.attach(traverser, this.master.partitioner().getGraph());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index f3edb28..8ea48c3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -25,7 +25,6 @@ import org.apache.tinkerpop.gremlin.process.actors.Address;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
@@ -128,9 +127,8 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
}
} else
((Barrier) step).done(); // the master drains the global barrier
- } else if (message instanceof Terminate) {
+ } else if (message instanceof Boolean) {
////////// DETERMINE TERMINATION CONDITION //////////
- final Terminate terminate = (Terminate) message;
if (this.voteToHalt && !this.barriers.isEmpty()) {
for (final Barrier barrier : this.barriers.values()) {
if (barrier instanceof LocalBarrier) {
@@ -146,7 +144,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
this.voteToHalt = false;
}
// use termination token to determine termination condition
- this.worker.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
+ this.worker.send(this.neighborAddress, this.voteToHalt ? message : Boolean.FALSE);
this.voteToHalt = true;
} else {
throw new IllegalArgumentException("The following message is unknown: " + message);
@@ -168,9 +166,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
this.worker.send(this.worker.master(), traverser);
} else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
// mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
- for (final Address.Worker worker : this.worker.workers()) {
- this.worker.send(worker, traverser);
- }
+ this.worker.broadcast(traverser);
} else if (traverser.get() instanceof Element && !this.worker.partition().contains((Element) traverser.get())) {
// if the traverser references a non-local element, send the traverser to the appropriate worker/partition
this.worker.send(this.partitionToWorkerMap.get(this.worker.partition().partitioner().find((Element) traverser.get())), traverser);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
deleted file mode 100644
index a4bde19..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public enum Terminate {
-
- YES, NO
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index 149872a..d5fe24e 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -88,10 +88,19 @@ public class GraphActorsTest extends AbstractGremlinProcessTest {
}
@Test
+ @Ignore
public void shouldSupportMessageCombiners() throws Exception {
for (int i = 1; i < 10; i++) {
final GraphActors actors = graphProvider.getGraphActors(graph);
actors.workers(i).program(new TestMessageCombinersActorProgram()).submit(graph).get();
}
}
+
+ @Test
+ public void shouldSupportMessagePriorities() throws Exception {
+ for (int i = 1; i < 10; i++) {
+ final GraphActors actors = graphProvider.getGraphActors(graph);
+ actors.workers(i).program(new TestMessagePrioritiesActorProgram()).submit(graph).get();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a406a882/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java
new file mode 100644
index 0000000..fe4c452
--- /dev/null
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestMessagePrioritiesActorProgram.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class TestMessagePrioritiesActorProgram implements ActorProgram {
+
+ @Override
+ public Map<Class, BinaryOperator> getMessageTypes() {
+ final Map<Class, BinaryOperator> messages = new LinkedHashMap<>();
+ messages.put(Integer.class, null);
+ messages.put(Double.class, null);
+ messages.put(Long.class, null);
+ return messages;
+ }
+
+ @Override
+ public Worker createWorkerProgram(final Actor.Worker worker) {
+ return new Worker<Number>() {
+
+ private int longCount = 0;
+
+ @Override
+ public void setup() {
+ worker.send(worker.master(), 2.0d);
+ worker.send(worker.master(), 1);
+ }
+
+ @Override
+ public void execute(final Number message) {
+ assertEquals(5L, message);
+ assertEquals(0, this.longCount);
+ this.longCount++;
+ worker.send(worker.master(), message);
+ }
+
+ @Override
+ public void terminate() {
+ assertEquals(1, this.longCount);
+ }
+ };
+ }
+
+ @Override
+ public Master createMasterProgram(final Actor.Master master) {
+
+ return new Master<Number>() {
+
+ private int doubleCount = 0;
+ private int integerCount = 0;
+ private int longCount = 0;
+
+
+ @Override
+ public void setup() {
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Override
+ public void execute(final Number message) {
+ if (message instanceof Integer) {
+ assertEquals(1, message);
+ assertEquals(0, this.doubleCount);
+ assertEquals(0, this.longCount);
+ this.integerCount++;
+ } else if (message instanceof Double) {
+ assertEquals(2.0d, message);
+ assertEquals(master.workers().size(), this.integerCount);
+ assertEquals(0, this.longCount);
+ if (this.doubleCount == 0)
+ master.broadcast(5L);
+ this.doubleCount++;
+ } else if (message instanceof Long) {
+ assertEquals(5L, message);
+ assertEquals(master.workers().size(), this.integerCount);
+ assertEquals(master.workers().size(), this.doubleCount);
+ if (++this.longCount == master.workers().size())
+ master.close();
+ }
+ }
+
+ @Override
+ public void terminate() {
+ assertEquals(master.workers().size(), this.integerCount);
+ assertEquals(master.workers().size(), this.doubleCount);
+ assertEquals(master.workers().size(), this.longCount);
+ master.setResult(null);
+ }
+ };
+ }
+
+ @Override
+ public ActorProgram clone() {
+ return this;
+ }
+}