You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2020/10/05 09:05:08 UTC

[cassandra] branch trunk updated: Fix flaky test ConnectionTest.testMessagePurging

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba63fa3  Fix flaky test ConnectionTest.testMessagePurging
ba63fa3 is described below

commit ba63fa3c951cb5c18d0fa4f9483577c6e18389c4
Author: Adam Holmberg <ad...@datastax.com>
AuthorDate: Wed Aug 19 15:32:09 2020 -0500

    Fix flaky test ConnectionTest.testMessagePurging
    
    patch by Adam Holmberg; reviewed by Yifan Cai and Benjamin Lerer for
    CASSANDRA-15958
    
    The patch fix 2 problems a race condition in InboundSocket.close when it
    is called multiple times and the flakyness in ConnectionTest.testMessagePurging.
---
 src/java/org/apache/cassandra/net/InboundSockets.java  | 14 +++++++++++++-
 .../org/apache/cassandra/net/OutboundConnection.java   |  3 ++-
 .../org/apache/cassandra/net/OutboundMessageQueue.java |  1 +
 test/unit/org/apache/cassandra/net/ConnectionTest.java | 18 +++++++++++++++---
 4 files changed, 31 insertions(+), 5 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/InboundSockets.java b/src/java/org/apache/cassandra/net/InboundSockets.java
index 6fc5f52..93caf85 100644
--- a/src/java/org/apache/cassandra/net/InboundSockets.java
+++ b/src/java/org/apache/cassandra/net/InboundSockets.java
@@ -63,6 +63,9 @@ class InboundSockets
         // purely to prevent close racing with open
         private boolean closedWithoutOpening;
 
+        // used to prevent racing on close
+        private Future<Void> closeFuture;
+
         /**
          * A group of the open, inbound {@link Channel}s connected to this node. This is mostly interesting so that all of
          * the inbound connections/channels can be closed when the listening socket itself is being closed.
@@ -109,7 +112,9 @@ class InboundSockets
          * Close this socket and any connections created on it. Once closed, this socket may not be re-opened.
          *
          * This may not execute synchronously, so a Future is returned encapsulating its result.
-         * @param shutdownExecutors
+         * @param shutdownExecutors consumer invoked with the internal executor on completion
+         *                          Note that the consumer will only be invoked once per InboundSocket.
+         *                          Subsequent calls to close will not register a callback to different consumers.
          */
         private Future<Void> close(Consumer<? super ExecutorService> shutdownExecutors)
         {
@@ -136,6 +141,13 @@ class InboundSockets
                     return new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null);
                 }
 
+                if (closeFuture != null)
+                {
+                    return closeFuture;
+                }
+
+                closeFuture = done;
+
                 if (listen != null)
                 {
                     close.run();
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index b0edc03..66f14db 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -110,7 +110,8 @@ public class OutboundConnection
 
     private final OutboundMessageCallbacks callbacks;
     private final OutboundDebugCallbacks debug;
-    private final OutboundMessageQueue queue;
+    @VisibleForTesting
+    final OutboundMessageQueue queue;
     /** the number of bytes we permit to queue to the network without acquiring any shared resource permits */
     private final long pendingCapacityInBytes;
     /** the number of messages and bytes queued for flush to the network,
diff --git a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
index 3d8bac0..d7360a0 100644
--- a/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
+++ b/src/java/org/apache/cassandra/net/OutboundMessageQueue.java
@@ -87,6 +87,7 @@ class OutboundMessageQueue
     {
         maybePruneExpired();
         externalQueue.offer(m);
+        // Known race here. See CASSANDRAi-15958
         nextExpirationDeadlineUpdater.accumulateAndGet(this,
                                                        maybeUpdateEarliestExpiresAt(clock.now(), m.expiresAtNanos()),
                                                        Math::min);
diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java b/test/unit/org/apache/cassandra/net/ConnectionTest.java
index eb8d867..5c637ac 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java
@@ -685,9 +685,21 @@ public class ConnectionTest
                         Message<?> message = Message.builder(Verb._TEST_1, noPayload)
                                                     .withExpiresAt(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(50L))
                                                     .build();
-                        outbound.enqueue(message);
-                        Assert.assertFalse(outbound.isConnected());
-                        Assert.assertEquals(1, outbound.pendingCount());
+                        OutboundMessageQueue queue = outbound.queue;
+                        while (true)
+                        {
+                            try (OutboundMessageQueue.WithLock withLock = queue.lockOrCallback(System.nanoTime(), null))
+                            {
+                                if (withLock != null)
+                                {
+                                    outbound.enqueue(message);
+                                    Assert.assertFalse(outbound.isConnected());
+                                    Assert.assertEquals(1, outbound.pendingCount());
+                                    break;
+                                }
+                            }
+                        }
+
                         CompletableFuture.runAsync(() -> {
                             while (outbound.pendingCount() > 0 && !Thread.interrupted()) {}
                         }).get(10, SECONDS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org