You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/10/05 09:05:08 UTC
[cassandra] branch trunk updated: Fix flaky test
ConnectionTest.testMessagePurging
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba63fa3 Fix flaky test ConnectionTest.testMessagePurging
ba63fa3 is described below
commit ba63fa3c951cb5c18d0fa4f9483577c6e18389c4
Author: Adam Holmberg <ad...@datastax.com>
AuthorDate: Wed Aug 19 15:32:09 2020 -0500
Fix flaky test ConnectionTest.testMessagePurging
patch by Adam Holmberg; reviewed by Yifan Cai and Benjamin Lerer for
CASSANDRA-15958
The patch fix 2 problems a race condition in InboundSocket.close when it
is called multiple times and the flakyness in ConnectionTest.testMessagePurging.
---
src/java/org/apache/cassandra/net/InboundSockets.java | 14 +++++++++++++-
.../org/apache/cassandra/net/OutboundConnection.java | 3 ++-
.../org/apache/cassandra/net/OutboundMessageQueue.java | 1 +
test/unit/org/apache/cassandra/net/ConnectionTest.java | 18 +++++++++++++++---
4 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/src/java/org/apache/cassandra/net/InboundSockets.java b/src/java/org/apache/cassandra/net/InboundSockets.java
index 6fc5f52..93caf85 100644
--- a/src/java/org/apache/cassandra/net/InboundSockets.java
+++ b/src/java/org/apache/cassandra/net/InboundSockets.java
@@ -63,6 +63,9 @@ class InboundSockets
// purely to prevent close racing with open
private boolean closedWithoutOpening;
+ // used to prevent racing on close
+ private Future<Void> closeFuture;
+
/**
* A group of the open, inbound {@link Channel}s connected to this node. This is mostly interesting so that all of
* the inbound connections/channels can be closed when the listening socket itself is being closed.
@@ -109,7 +112,9 @@ class InboundSockets
* Close this socket and any connections created on it. Once closed, this socket may not be re-opened.
*
* This may not execute synchronously, so a Future is returned encapsulating its result.
- * @param shutdownExecutors
+ * @param shutdownExecutors consumer invoked with the internal executor on completion
+ * Note that the consumer will only be invoked once per InboundSocket.
+ * Subsequent calls to close will not register a callback to different consumers.
*/
private Future<Void> close(Consumer<? super ExecutorService> shutdownExecutors)
{
@@ -136,6 +141,13 @@ class InboundSockets
return new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null);
}
+ if (closeFuture != null)
+ {
+ return closeFuture;
+ }
+
+ closeFuture = done;
+
if (listen != null)
{
close.run();
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index b0edc03..66f14db 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -110,7 +110,8 @@ public class OutboundConnection
private final OutboundMessageCallbacks callbacks;
private final OutboundDebugCallbacks debug;
- private final OutboundMessageQueue queue;
+ @VisibleForTesting
+ final OutboundMessageQueue queue;
/** the number of bytes we permit to queue to the network without acquiring any shared resource permits */
private final long pendingCapacityInBytes;
/** the number of messages and bytes queued for flush to the network,
diff --git a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
index 3d8bac0..d7360a0 100644
--- a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
+++ b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
@@ -87,6 +87,7 @@ class OutboundMessageQueue
{
maybePruneExpired();
externalQueue.offer(m);
+ // Known race here. See CASSANDRAi-15958
nextExpirationDeadlineUpdater.accumulateAndGet(this,
maybeUpdateEarliestExpiresAt(clock.now(), m.expiresAtNanos()),
Math::min);
diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java b/test/unit/org/apache/cassandra/net/ConnectionTest.java
index eb8d867..5c637ac 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java
@@ -685,9 +685,21 @@ public class ConnectionTest
Message<?> message = Message.builder(Verb._TEST_1, noPayload)
.withExpiresAt(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(50L))
.build();
- outbound.enqueue(message);
- Assert.assertFalse(outbound.isConnected());
- Assert.assertEquals(1, outbound.pendingCount());
+ OutboundMessageQueue queue = outbound.queue;
+ while (true)
+ {
+ try (OutboundMessageQueue.WithLock withLock = queue.lockOrCallback(System.nanoTime(), null))
+ {
+ if (withLock != null)
+ {
+ outbound.enqueue(message);
+ Assert.assertFalse(outbound.isConnected());
+ Assert.assertEquals(1, outbound.pendingCount());
+ break;
+ }
+ }
+ }
+
CompletableFuture.runAsync(() -> {
while (outbound.pendingCount() > 0 && !Thread.interrupted()) {}
}).get(10, SECONDS);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org