You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2018/07/03 13:23:41 UTC
[2/9] cassandra git commit: Create MessagingService mocking classes
Create MessagingService mocking classes
Patch by Stefan Podkowinski; reviewed by Tyler Hobbs for CASSANDRA-12016
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e478d38
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e478d38
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e478d38
Branch: refs/heads/cassandra-3.11
Commit: 1e478d380bd497a3fe635b35b6e87e002cabf617
Parents: c4e6cd2
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Tue Jul 12 12:04:23 2016 -0500
Committer: Aleksey Yeshchenko <al...@apple.com>
Committed: Tue Jul 3 14:16:25 2018 +0100
----------------------------------------------------------------------
.../apache/cassandra/gms/FailureDetector.java | 9 +-
.../apache/cassandra/net/MessagingService.java | 5 +
.../cassandra/service/ActiveRepairService.java | 3 +-
.../org/apache/cassandra/utils/ExpiringMap.java | 4 +-
test/unit/org/apache/cassandra/net/Matcher.java | 32 +++
.../apache/cassandra/net/MatcherResponse.java | 210 +++++++++++++++++
.../cassandra/net/MockMessagingService.java | 144 ++++++++++++
.../cassandra/net/MockMessagingServiceTest.java | 96 ++++++++
.../apache/cassandra/net/MockMessagingSpy.java | 234 +++++++++++++++++++
9 files changed, 730 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 70a354c..cda6469 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -52,7 +53,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage of the max, log a debug message
private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds
private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause();
- private long lastInterpret = System.nanoTime();
+ private long lastInterpret = Clock.instance.nanoTime();
private long lastPause = 0L;
private static long getMaxLocalPause()
@@ -252,7 +253,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
public void report(InetAddress ep)
{
- long now = System.nanoTime();
+ long now = Clock.instance.nanoTime();
ArrivalWindow heartbeatWindow = arrivalSamples.get(ep);
if (heartbeatWindow == null)
{
@@ -279,7 +280,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
{
return;
}
- long now = System.nanoTime();
+ long now = Clock.instance.nanoTime();
long diff = now - lastInterpret;
lastInterpret = now;
if (diff > MAX_LOCAL_PAUSE_IN_NANOS)
@@ -288,7 +289,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
lastPause = now;
return;
}
- if (System.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS)
+ if (Clock.instance.nanoTime() - lastPause < MAX_LOCAL_PAUSE_IN_NANOS)
{
logger.debug("Still not marking nodes down due to local pause");
return;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 047f51f..67c3ba9 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -389,6 +389,11 @@ public final class MessagingService implements MessagingServiceMBean
messageSinks.add(sink);
}
+ public void removeMessageSink(IMessageSink sink)
+ {
+ messageSinks.remove(sink);
+ }
+
public void clearMessageSinks()
{
messageSinks.clear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index adb6fab..f63cb86 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.RepairSession;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.utils.CassandraVersion;
+import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Ref;
@@ -282,7 +283,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
{
- long timestamp = System.currentTimeMillis();
+ long timestamp = Clock.instance.currentTimeMillis();
registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), timestamp, options.isGlobal());
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
final AtomicBoolean status = new AtomicBoolean(true);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 8359918..ef013f5 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -48,7 +48,7 @@ public class ExpiringMap<K, V>
assert value != null;
this.value = value;
this.timeout = timeout;
- this.createdAt = System.nanoTime();
+ this.createdAt = Clock.instance.nanoTime();
}
private boolean isReadyToDieAt(long atNano)
@@ -85,7 +85,7 @@ public class ExpiringMap<K, V>
{
public void run()
{
- long start = System.nanoTime();
+ long start = Clock.instance.nanoTime();
int n = 0;
for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/Matcher.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/Matcher.java b/test/unit/org/apache/cassandra/net/Matcher.java
new file mode 100644
index 0000000..cd1b667
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/Matcher.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+/**
+ * Predicate based on intercepted, outgoing messange and the message's destination address.
+ */
+public interface Matcher<T>
+{
+ /**
+ * @param obj intercepted outgoing message
+ * @param to destination address
+ */
+ public boolean matches(MessageOut<T> obj, InetAddress to);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java
new file mode 100644
index 0000000..12a8d1b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.apache.cassandra.utils.Clock;
+
+/**
+ * Sends a response for an incoming message with a matching {@link Matcher}.
+ * The actual behavior by any instance of this class can be inspected by
+ * interacting with the returned {@link MockMessagingSpy}.
+ */
+public class MatcherResponse
+{
+ private final Matcher<?> matcher;
+ private final Set<Integer> sendResponses = new HashSet<>();
+ private final MockMessagingSpy spy = new MockMessagingSpy();
+ private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE);
+ private IMessageSink sink;
+
+ MatcherResponse(Matcher<?> matcher)
+ {
+ this.matcher = matcher;
+ }
+
+ /**
+ * Do not create any responses for intercepted outbound messages.
+ */
+ public MockMessagingSpy dontReply()
+ {
+ return respond((MessageIn<?>)null);
+ }
+
+ /**
+ * Respond with provided message in reply to each intercepted outbound message.
+ * @param message the message to use as mock reply from the cluster
+ */
+ public MockMessagingSpy respond(MessageIn<?> message)
+ {
+ return respondN(message, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond a limited number of times with the provided message in reply to each intercepted outbound message.
+ * @param response the message to use as mock reply from the cluster
+ * @param limit number of times to respond with message
+ */
+ public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
+ {
+ return respondN((in, to) -> response, limit);
+ }
+
+ /**
+ * Respond with the message created by the provided function that will be called with each intercepted outbound message.
+ * @param fnResponse function to call for creating reply based on intercepted message and target address
+ */
+ public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
+ {
+ return respondN(fnResponse, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message.
+ * The target address from the intercepted message will automatically be used as the created message's sender address.
+ * @param fnResponse function to call for creating payload object based on intercepted message and target address
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb)
+ {
+ return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond a limited number of times with message wrapping the payload object created by provided function called for
+ * each intercepted outbound message. The target address from the intercepted message will automatically be used as the
+ * created message's sender address.
+ * @param fnResponse function to call for creating payload object based on intercepted message and target address
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
+ {
+ return respondN((MessageOut<T> msg, InetAddress to) -> {
+ S payload = fnResponse.apply(msg);
+ if (payload == null)
+ return null;
+ else
+ return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version);
+ },
+ limit);
+ }
+
+ /**
+ * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
+ * from the provided queue. No reply will be send when the queue has been exhausted.
+ * @param cannedResponses prepared payload messages to use for responses
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb)
+ {
+ return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb);
+ }
+
+ /**
+ * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
+ * from the provided queue. This method will block until queue elements are available.
+ * @param cannedResponses prepared payload messages to use for responses
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb)
+ {
+ return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
+ try
+ {
+ return cannedResponses.take();
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }, verb);
+ }
+
+ /**
+ * Respond a limited number of times with the message created by the provided function that will be called with
+ * each intercepted outbound message.
+ * @param fnResponse function to call for creating reply based on intercepted message and target address
+ */
+ public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
+ {
+ limitCounter.set(limit);
+
+ assert sink == null: "destroy() must be called first to register new response";
+
+ sink = new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ // prevent outgoing message from being send in case matcher indicates a match
+ // and instead send the mocked response
+ if (matcher.matches(message, to))
+ {
+ spy.matchingMessage(message);
+
+ if (limitCounter.decrementAndGet() < 0)
+ return false;
+
+ synchronized (sendResponses)
+ {
+ // I'm not sure about retry semantics regarding message/ID relationships, but I assume
+ // sending a message multiple times using the same ID shouldn't happen..
+ assert !sendResponses.contains(id) : "ID re-use for outgoing message";
+ sendResponses.add(id);
+ }
+ MessageIn<?> response = fnResponse.apply(message, to);
+ if (response != null)
+ {
+ CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
+ if (cb != null)
+ cb.callback.response(response);
+ else
+ MessagingService.instance().receive(response, id, Clock.instance.currentTimeMillis(), false);
+ spy.matchingResponse(response);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return true;
+ }
+ };
+ MessagingService.instance().addMessageSink(sink);
+
+ return spy;
+ }
+
+ /**
+ * Stops currently registered response from being send.
+ */
+ public void destroy()
+ {
+ MessagingService.instance().removeMessageSink(sink);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingService.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingService.java b/test/unit/org/apache/cassandra/net/MockMessagingService.java
new file mode 100644
index 0000000..0412759
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MockMessagingService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.function.Predicate;
+
+/**
+ * Starting point for mocking {@link MessagingService} interactions. Outgoing messages can be
+ * intercepted by first creating a {@link MatcherResponse} by calling {@link MockMessagingService#when(Matcher)}.
+ * Alternatively {@link Matcher}s can be created by using helper methods such as {@link #to(InetAddress)},
+ * {@link #verb(MessagingService.Verb)} or {@link #payload(Predicate)} and may also be
+ * nested using {@link MockMessagingService#all(Matcher[])} or {@link MockMessagingService#any(Matcher[])}.
+ * After each test, {@link MockMessagingService#cleanup()} must be called for free listeners registered
+ * in {@link MessagingService}.
+ */
+public class MockMessagingService
+{
+
+ private MockMessagingService()
+ {
+ }
+
+ /**
+ * Creates a MatcherResponse based on specified matcher.
+ */
+ public static MatcherResponse when(Matcher matcher)
+ {
+ return new MatcherResponse(matcher);
+ }
+
+ /**
+ * Unsubscribes any handlers added by calling {@link MessagingService#addMessageSink(IMessageSink)}.
+ * This should be called after each test.
+ */
+ public static void cleanup()
+ {
+ MessagingService.instance().clearMessageSinks();
+ }
+
+ /**
+ * Creates a matcher that will indicate if the target address of the outgoing message equals the
+ * provided address.
+ */
+ public static Matcher<InetAddress> to(String address)
+ {
+ try
+ {
+ return to(InetAddress.getByName(address));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Creates a matcher that will indicate if the target address of the outgoing message equals the
+ * provided address.
+ */
+ public static Matcher<InetAddress> to(InetAddress address)
+ {
+ return (in, to) -> to == address || to.equals(address);
+ }
+
+ /**
+ * Creates a matcher that will indicate if the verb of the outgoing message equals the
+ * provided value.
+ */
+ public static Matcher<MessagingService.Verb> verb(MessagingService.Verb verb)
+ {
+ return (in, to) -> in.verb == verb;
+ }
+
+ /**
+ * Creates a matcher based on the result of the provided predicate called with the outgoing message.
+ */
+ public static <T> Matcher<T> message(Predicate<MessageOut<T>> fn)
+ {
+ return (msg, to) -> fn.test(msg);
+ }
+
+ /**
+ * Creates a matcher based on the result of the provided predicate called with the outgoing message's payload.
+ */
+ public static <T> Matcher<T> payload(Predicate<T> fn)
+ {
+ return (msg, to) -> fn.test(msg.payload);
+ }
+
+ /**
+ * Inverts boolean result of wrapped matcher.
+ */
+ public static <T> Matcher<T> not(Matcher<T> matcher)
+ {
+ return (o, to) -> !matcher.matches(o, to);
+ }
+
+ /**
+ * Indicates true in case all provided matchers returned true.
+ */
+ public static <T> Matcher<?> all(Matcher<?>... matchers)
+ {
+ return (MessageOut<T> out, InetAddress to) -> {
+ for (Matcher matcher : matchers)
+ {
+ if (!matcher.matches(out, to))
+ return false;
+ }
+ return true;
+ };
+ }
+
+ /**
+ * Indicates true in case at least a single provided matcher returned true.
+ */
+ public static <T> Matcher<?> any(Matcher<?>... matchers)
+ {
+ return (MessageOut<T> out, InetAddress to) -> {
+ for (Matcher matcher : matchers)
+ {
+ if (matcher.matches(out, to))
+ return true;
+ }
+ return false;
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
new file mode 100644
index 0000000..ed4cce8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.EchoMessage;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.MockMessagingService.all;
+import static org.apache.cassandra.net.MockMessagingService.to;
+import static org.apache.cassandra.net.MockMessagingService.verb;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MockMessagingServiceTest
+{
+ @BeforeClass
+ public static void initCluster() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ StorageService.instance.initServer();
+ }
+
+ @Before
+ public void cleanup()
+ {
+ MockMessagingService.cleanup();
+ }
+
+ @Test
+ public void testRequestResponse() throws InterruptedException, ExecutionException
+ {
+ // echo message that we like to mock as incoming reply for outgoing echo message
+ MessageIn<EchoMessage> echoMessageIn = MessageIn.create(FBUtilities.getBroadcastAddress(),
+ EchoMessage.instance,
+ Collections.emptyMap(),
+ MessagingService.Verb.ECHO,
+ MessagingService.current_version
+ );
+ MockMessagingSpy spy = MockMessagingService
+ .when(
+ all(
+ to(FBUtilities.getBroadcastAddress()),
+ verb(MessagingService.Verb.ECHO)
+ )
+ )
+ .respond(echoMessageIn);
+
+ MessageOut<EchoMessage> echoMessageOut = new MessageOut<>(MessagingService.Verb.ECHO, EchoMessage.instance, EchoMessage.serializer);
+ MessagingService.instance().sendRR(echoMessageOut, FBUtilities.getBroadcastAddress(), new IAsyncCallback()
+ {
+ public void response(MessageIn msg)
+ {
+ assertEquals(MessagingService.Verb.ECHO, msg.verb);
+ assertEquals(echoMessageIn.payload, msg.payload);
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+ });
+
+ // we must have intercepted the outgoing message at this point
+ MessageOut<?> msg = spy.captureMessageOut().get();
+ assertEquals(1, spy.messagesIntercepted);
+ assertTrue(msg == echoMessageOut);
+
+ // and return a mocked response
+ assertEquals(1, spy.mockedMessageResponses);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e478d38/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingSpy.java b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
new file mode 100644
index 0000000..80bdb39
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MockMessagingSpy.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junit.framework.AssertionFailedError;
+
+/**
+ * Allows inspecting the behavior of mocked messaging by observing {@link MatcherResponse}.
+ */
+public class MockMessagingSpy
+{
+ private static final Logger logger = LoggerFactory.getLogger(MockMessagingSpy.class);
+
+ public int messagesIntercepted = 0;
+ public int mockedMessageResponses = 0;
+
+ private final BlockingQueue<MessageOut<?>> interceptedMessages = new LinkedBlockingQueue<>();
+ private final BlockingQueue<MessageIn<?>> deliveredResponses = new LinkedBlockingQueue<>();
+
+ private static final Executor executor = Executors.newSingleThreadExecutor();
+
+ /**
+ * Returns a future with the first mocked incoming message that has been created and delivered.
+ */
+ public ListenableFuture<MessageIn<?>> captureMockedMessageIn()
+ {
+ return Futures.transform(captureMockedMessageInN(1), (List<MessageIn<?>> result) -> result.isEmpty() ? null : result.get(0));
+ }
+
+ /**
+ * Returns a future with the specified number mocked incoming messages that have been created and delivered.
+ */
+ public ListenableFuture<List<MessageIn<?>>> captureMockedMessageInN(int noOfMessages)
+ {
+ CapturedResultsFuture<MessageIn<?>> ret = new CapturedResultsFuture<>(noOfMessages, deliveredResponses);
+ executor.execute(ret);
+ return ret;
+ }
+
+ /**
+ * Returns a future that will indicate if a mocked incoming message has been created and delivered.
+ */
+ public ListenableFuture<Boolean> expectMockedMessageIn()
+ {
+ return expectMockedMessageIn(1);
+ }
+
+ /**
+ * Returns a future that will indicate if the specified number of mocked incoming message have been created and delivered.
+ */
+ public ListenableFuture<Boolean> expectMockedMessageIn(int noOfMessages)
+ {
+ ResultsCompletionFuture<MessageIn<?>> ret = new ResultsCompletionFuture<>(noOfMessages, deliveredResponses);
+ executor.execute(ret);
+ return ret;
+ }
+
+ /**
+ * Returns a future with the first intercepted outbound message that would have been send.
+ */
+ public ListenableFuture<MessageOut<?>> captureMessageOut()
+ {
+ return Futures.transform(captureMessageOut(1), (List<MessageOut<?>> result) -> result.isEmpty() ? null : result.get(0));
+ }
+
+ /**
+ * Returns a future with the specified number of intercepted outbound messages that would have been send.
+ */
+ public ListenableFuture<List<MessageOut<?>>> captureMessageOut(int noOfMessages)
+ {
+ CapturedResultsFuture<MessageOut<?>> ret = new CapturedResultsFuture<>(noOfMessages, interceptedMessages);
+ executor.execute(ret);
+ return ret;
+ }
+
+ /**
+ * Returns a future that will indicate if an intercepted outbound messages would have been send.
+ */
+ public ListenableFuture<Boolean> interceptMessageOut()
+ {
+ return interceptMessageOut(1);
+ }
+
+ /**
+ * Returns a future that will indicate if the specified number of intercepted outbound messages would have been send.
+ */
+ public ListenableFuture<Boolean> interceptMessageOut(int noOfMessages)
+ {
+ ResultsCompletionFuture<MessageOut<?>> ret = new ResultsCompletionFuture<>(noOfMessages, interceptedMessages);
+ executor.execute(ret);
+ return ret;
+ }
+
+ /**
+ * Returns a future that will indicate the absence of any intercepted outbound messages with the specifed period.
+ */
+ public ListenableFuture<Boolean> interceptNoMsg(long time, TimeUnit unit)
+ {
+ ResultAbsenceFuture<MessageOut<?>> ret = new ResultAbsenceFuture<>(interceptedMessages, time, unit);
+ executor.execute(ret);
+ return ret;
+ }
+
+ void matchingMessage(MessageOut<?> message)
+ {
+ messagesIntercepted++;
+ logger.trace("Received matching message: {}", message);
+ interceptedMessages.add(message);
+ }
+
+ void matchingResponse(MessageIn<?> response)
+ {
+ mockedMessageResponses++;
+ logger.trace("Responding to intercepted message: {}", response);
+ deliveredResponses.add(response);
+ }
+
+
+ private static class CapturedResultsFuture<T> extends AbstractFuture<List<T>> implements Runnable
+ {
+ private final int waitForResults;
+ private final List<T> results;
+ private final BlockingQueue<T> queue;
+
+ CapturedResultsFuture(int waitForResponses, BlockingQueue<T> queue)
+ {
+ this.waitForResults = waitForResponses;
+ results = new ArrayList<T>(waitForResponses);
+ this.queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (results.size() < waitForResults)
+ results.add(queue.take());
+
+ set(results);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ }
+
+ private static class ResultsCompletionFuture<T> extends AbstractFuture<Boolean> implements Runnable
+ {
+ private final int waitForResults;
+ private final BlockingQueue<T> queue;
+
+ ResultsCompletionFuture(int waitForResponses, BlockingQueue<T> queue)
+ {
+ this.waitForResults = waitForResponses;
+ this.queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < waitForResults; i++)
+ {
+ queue.take();
+ }
+ set(true);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ }
+
+ private static class ResultAbsenceFuture<T> extends AbstractFuture<Boolean> implements Runnable
+ {
+ private final BlockingQueue<T> queue;
+ private final long time;
+ private final TimeUnit unit;
+
+ ResultAbsenceFuture(BlockingQueue<T> queue, long time, TimeUnit unit)
+ {
+ this.queue = queue;
+ this.time = time;
+ this.unit = unit;
+ }
+
+ public void run()
+ {
+ try
+ {
+ T result = queue.poll(time, unit);
+ if (result != null)
+ setException(new AssertionFailedError("Received unexpected message: " + result));
+ else
+ set(true);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org