You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/11/09 17:00:37 UTC

[cassandra] branch trunk updated: MessagingServiceTest listenOptionalSecureConnection and listenRequiredSecureConnection fail sporadically

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4fb170c  MessagingServiceTest listenOptionalSecureConnection and listenRequiredSecureConnection fail sporadically
4fb170c is described below

commit 4fb170c5c2c73737d74197dc41d83371c303106b
Author: David Capwell <dc...@apache.org>
AuthorDate: Tue Nov 9 07:56:06 2021 -0800

    MessagingServiceTest listenOptionalSecureConnection and listenRequiredSecureConnection fail sporadically
    
    patch by David Capwell; reviewed by Benedict Elliott Smith, Caleb Rackliffe for CASSANDRA-17033
---
 .../org/apache/cassandra/net/InboundSockets.java    | 21 +++++++++++++++++----
 .../apache/cassandra/net/MessagingServiceTest.java  | 13 ++++++++-----
 2 files changed, 25 insertions(+), 9 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/InboundSockets.java b/src/java/org/apache/cassandra/net/InboundSockets.java
index b0e9ad1..573cccb 100644
--- a/src/java/org/apache/cassandra/net/InboundSockets.java
+++ b/src/java/org/apache/cassandra/net/InboundSockets.java
@@ -99,15 +99,28 @@ class InboundSockets
                     throw new IllegalStateException();
                 binding = InboundConnectionInitiator.bind(settings, connections, pipelineInjector);
             }
-
-            return binding.addListener(ignore -> {
+            // isOpen is defined as "listen.isOpen", but this is set AFTER the binding future is set
+            // to make sure the future returned does not complete until listen is set, need a new
+            // future to replicate "Future.map" behavior.
+            AsyncChannelPromise promise = new AsyncChannelPromise(binding.channel());
+            binding.addListener(f -> {
+                if (!f.isSuccess())
+                {
+                    synchronized (this)
+                    {
+                        binding = null;
+                    }
+                    promise.setFailure(f.cause());
+                    return;
+                }
                 synchronized (this)
                 {
-                    if (binding.isSuccess())
-                        listen = binding.channel();
+                    listen = binding.channel();
                     binding = null;
                 }
+                promise.setSuccess(null);
             });
+            return promise;
         }
 
         /**
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index a82315b..871f592 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -310,9 +310,12 @@ public class MessagingServiceTest
     @Test
     public void listenOptionalSecureConnection() throws InterruptedException
     {
-        ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions()
-                                                          .withOptional(true);
-        listen(serverEncryptionOptions, false);
+        for (int i = 0; i < 500; i++) // test used to be flaky, so run in a loop to make sure stable (see CASSANDRA-17033)
+        {
+            ServerEncryptionOptions serverEncryptionOptions = new ServerEncryptionOptions()
+                                                              .withOptional(true);
+            listen(serverEncryptionOptions, false);
+        }
     }
 
     @Test
@@ -339,8 +342,8 @@ public class MessagingServiceTest
         InboundSockets connections = new InboundSockets(settings);
         try
         {
-            connections.open().await();
-            Assert.assertTrue(connections.isListening());
+            connections.open().sync();
+            Assert.assertTrue("connections is not listening", connections.isListening());
 
             Set<InetAddressAndPort> expect = new HashSet<>();
             expect.add(InetAddressAndPort.getByAddressOverrideDefaults(listenAddress, DatabaseDescriptor.getStoragePort()));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org