You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2020/05/13 15:36:14 UTC

[cassandra] branch trunk updated: Update port when reconnecting to pre-4.0 SSL storage

This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 678ca3f  Update port when reconnecting to pre-4.0 SSL storage
678ca3f is described below

commit 678ca3fc29c38b64a110dcf40693aa7840b0585c
Author: Jon Meredith <jm...@gmail.com>
AuthorDate: Tue Apr 7 18:58:59 2020 -0600

    Update port when reconnecting to pre-4.0 SSL storage
    
    On a failed outbound connection to a node with pending data, recheck
    the messaging version before reattempting the connection.
    
    Prior to this change, if the endpoint version was incorrectly set
    to 4.0 when the node was running 3.0 with an SSL storage port
    the connection would continuously try to reconnect on the wrong port.
    
    The patch also improves some of the log messages to include the
    actual port being connected to as well as the canonical endpoint for
    the node.
    
    Patch by Jon Meredith & Andy Tolbert; reviewed by Aleksey Yeschenko for
    CASSANDRA-15727
    
    Co-authored-by: Jon Meredith <jm...@gmail.com>
    Co-authored-by: Andy Tolbert <an...@apple.com>
---
 CHANGES.txt                                        |  1 +
 .../cassandra/net/InboundConnectionInitiator.java  |  4 +-
 .../org/apache/cassandra/net/InboundSockets.java   | 19 ++++-
 .../apache/cassandra/net/OutboundConnection.java   | 20 ++++-
 .../cassandra/net/OutboundConnectionInitiator.java | 10 +--
 .../cassandra/net/OutboundConnectionSettings.java  |  7 ++
 .../org/apache/cassandra/net/ConnectionTest.java   | 92 ++++++++++++++++++++++
 7 files changed, 142 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e6acf40..98b3f68 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Update port when reconnecting to pre-4.0 SSL storage (CASSANDRA-15727)
  * Only calculate dynamicBadnessThreshold once per loop in DynamicEndpointSnitch (CASSANDRA-15798)
  * Cleanup redundant nodetool commands added in 4.0 (CASSANDRA-15256)
  * Update to Python driver 3.23 for cqlsh (CASSANDRA-15793)
diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
index c390ba4..3c1498b 100644
--- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
@@ -239,8 +239,8 @@ public class InboundConnectionInitiator
             if (sslHandler != null)
             {
                 SSLSession session = sslHandler.engine().getSession();
-                logger.info("connection from peer {}, protocol = {}, cipher suite = {}",
-                            ctx.channel().remoteAddress(), session.getProtocol(), session.getCipherSuite());
+                logger.info("connection from peer {} to {}, protocol = {}, cipher suite = {}",
+                            ctx.channel().remoteAddress(), ctx.channel().localAddress(), session.getProtocol(), session.getCipherSuite());
             }
         }
 
diff --git a/src/java/org/apache/cassandra/net/InboundSockets.java b/src/java/org/apache/cassandra/net/InboundSockets.java
index 8f74eaa..eb9ef8e 100644
--- a/src/java/org/apache/cassandra/net/InboundSockets.java
+++ b/src/java/org/apache/cassandra/net/InboundSockets.java
@@ -187,10 +187,23 @@ class InboundSockets
 
     private static void addBindings(InboundConnectionSettings template, ImmutableList.Builder<InboundSocket> out)
     {
-        InboundConnectionSettings settings = template.withDefaults();
-        out.add(new InboundSocket(settings));
+        InboundConnectionSettings       settings = template.withDefaults();
+        InboundConnectionSettings legacySettings = template.withLegacyDefaults();
+
         if (settings.encryption.enable_legacy_ssl_storage_port && settings.encryption.enabled)
-            out.add(new InboundSocket(template.withLegacyDefaults()));
+        {
+            out.add(new InboundSocket(legacySettings));
+
+            /*
+             * If the legacy ssl storage port and storage port match, only bind to the
+             * legacy ssl port. This makes it possible to configure a 4.0 node like a 3.0
+             * node with only the ssl_storage_port if required.
+             */
+            if (settings.bindAddress.equals(legacySettings.bindAddress))
+                return;
+        }
+
+        out.add(new InboundSocket(settings));
     }
 
     public Future<Void> open(Consumer<ChannelPipeline> pipelineInjector)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnection.java b/src/java/org/apache/cassandra/net/OutboundConnection.java
index b84ebc3..315d086 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnection.java
@@ -1084,7 +1084,25 @@ public class OutboundConnection
                 if (hasPending())
                 {
                     Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop);
-                    state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS));
+                    state = new Connecting(state.disconnected(),
+                                           result,
+                                           eventLoop.schedule(() ->
+                                           {
+                                               // Re-evaluate messagingVersion before re-attempting the connection in case
+                                               // endpointToVersion were updated. This happens if the outbound connection
+                                               // is made before the endpointToVersion table is initially constructed or out
+                                               // of date (e.g. if outbound connections are established for gossip
+                                               // as a result of an inbound connection) and can result in the wrong outbound
+                                               // port being selected if configured with enable_legacy_ssl_storage_port=true.
+                                               int maybeUpdatedVersion = template.endpointToVersion().get(template.to);
+                                               if (maybeUpdatedVersion != messagingVersion)
+                                               {
+                                                   logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.",
+                                                                messagingVersion, maybeUpdatedVersion);
+                                                   messagingVersion = maybeUpdatedVersion;
+                                               }
+                                               attempt(result);
+                                           }, max(100, retryRateMillis), MILLISECONDS));
                     retryRateMillis = min(1000, retryRateMillis * 2);
                 }
                 else
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
index fdfb2df..5f3eced 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java
@@ -131,7 +131,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
         {
             // interrupt other connections, so they must attempt to re-authenticate
             MessagingService.instance().interruptOutbound(settings.to);
-            return new FailedFuture<>(eventLoop, new IOException("authentication failed to " + settings.to));
+            return new FailedFuture<>(eventLoop, new IOException("authentication failed to " + settings.connectToId()));
         }
 
         // this is a bit ugly, but is the easiest way to ensure that if we timeout we can propagate a suitable error message
@@ -146,7 +146,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
                                              if (future.isCancelled() && !timedout.get())
                                                  resultPromise.cancel(true);
                                              else if (future.isCancelled())
-                                                 resultPromise.tryFailure(new IOException("Timeout handshaking with " + settings.connectTo));
+                                                 resultPromise.tryFailure(new IOException("Timeout handshaking with " + settings.connectToId()));
                                              else
                                                  resultPromise.tryFailure(future.cause());
                                          }
@@ -229,7 +229,7 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
         public void channelActive(final ChannelHandlerContext ctx)
         {
             Initiate msg = new Initiate(requestMessagingVersion, settings.acceptVersions, type, settings.framing, settings.from);
-            logger.trace("starting handshake with peer {}, msg = {}", settings.connectTo, msg);
+            logger.trace("starting handshake with peer {}, msg = {}", settings.connectToId(), msg);
             AsyncChannelPromise.writeAndFlush(ctx, msg.encode(),
                   future -> { if (!future.isSuccess()) exceptionCaught(ctx, future.cause()); });
 
@@ -368,9 +368,9 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI
                 JVMStabilityInspector.inspectThrowable(cause, false);
                 resultPromise.tryFailure(cause);
                 if (isCausedByConnectionReset(cause))
-                    logger.info("Failed to connect to peer {}", settings.to, cause);
+                    logger.info("Failed to connect to peer {}", settings.connectToId(), cause);
                 else
-                    logger.error("Failed to handshake with peer {}", settings.to, cause);
+                    logger.error("Failed to handshake with peer {}", settings.connectToId(), cause);
                 ctx.close();
             }
             catch (Throwable t)
diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
index c78df61..5f83b6a 100644
--- a/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
+++ b/src/java/org/apache/cassandra/net/OutboundConnectionSettings.java
@@ -449,6 +449,13 @@ public class OutboundConnectionSettings
         return connectTo;
     }
 
+    public String connectToId()
+    {
+        return !to.equals(connectTo())
+             ? to.toString()
+             : to.toString() + '(' + connectTo().toString() + ')';
+    }
+
     public Framing framing(ConnectionCategory category)
     {
         if (framing != null)
diff --git a/test/unit/org/apache/cassandra/net/ConnectionTest.java b/test/unit/org/apache/cassandra/net/ConnectionTest.java
index e92f196..c8dc369 100644
--- a/test/unit/org/apache/cassandra/net/ConnectionTest.java
+++ b/test/unit/org/apache/cassandra/net/ConnectionTest.java
@@ -55,6 +55,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -70,6 +71,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.cassandra.net.MessagingService.VERSION_30;
+import static org.apache.cassandra.net.MessagingService.VERSION_3014;
 import static org.apache.cassandra.net.MessagingService.VERSION_40;
 import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.MessagingService.current_version;
@@ -568,6 +570,96 @@ public class ConnectionTest
     }
 
     @Test
+    public void testPendingOutboundConnectionUpdatesMessageVersionOnReconnectAttempt() throws Throwable
+    {
+        final String storagePortProperty = Config.PROPERTY_PREFIX + "ssl_storage_port";
+        final String originalStoragePort = System.getProperty(storagePortProperty);
+        try
+        {
+            // Set up an inbound connection listening *only* on the SSL storage port to
+            // replicate a 3.x node.  Force the messaging version to be incorrectly set to 4.0
+            // before the outbound connection attempt.
+            final Settings settings = Settings.LARGE;
+            final InetAddressAndPort endpoint = FBUtilities.getBroadcastAddressAndPort();
+
+            MessagingService.instance().versions.set(FBUtilities.getBroadcastAddressAndPort(),
+                                                     MessagingService.VERSION_40);
+
+            System.setProperty(storagePortProperty, "7011");
+            final InetAddressAndPort legacySSLAddrsAndPort = endpoint.withPort(DatabaseDescriptor.getSSLStoragePort());
+            InboundConnectionSettings inboundSettings = settings.inbound.apply(new InboundConnectionSettings().withEncryption(encryptionOptions))
+                                                                        .withBindAddress(legacySSLAddrsAndPort)
+                                                                        .withAcceptMessaging(new AcceptVersions(VERSION_30, VERSION_3014))
+                                                                        .withSocketFactory(factory);
+            InboundSockets inbound = new InboundSockets(Collections.singletonList(inboundSettings));
+            OutboundConnectionSettings outboundTemplate = settings.outbound.apply(new OutboundConnectionSettings(endpoint).withEncryption(encryptionOptions))
+                                                                           .withDefaultReserveLimits()
+                                                                           .withSocketFactory(factory)
+                                                                           .withDefaults(ConnectionCategory.MESSAGING);
+            ResourceLimits.EndpointAndGlobal reserveCapacityInBytes = new ResourceLimits.EndpointAndGlobal(new ResourceLimits.Concurrent(outboundTemplate.applicationSendQueueReserveEndpointCapacityInBytes), outboundTemplate.applicationSendQueueReserveGlobalCapacityInBytes);
+            OutboundConnection outbound = new OutboundConnection(settings.type, outboundTemplate, reserveCapacityInBytes);
+            try
+            {
+                logger.info("Running {} {} -> {}", outbound.messagingVersion(), outbound.settings(), inboundSettings);
+                inbound.open().sync();
+
+                CountDownLatch done = new CountDownLatch(1);
+                unsafeSetHandler(Verb._TEST_1,
+                                 () -> (msg) -> done.countDown());
+
+                // Enqueuing outbound message will initiate an outbound
+                // connection with pending data in the pipeline
+                Message<?> message = Message.out(Verb._TEST_1, noPayload);
+                outbound.enqueue(message);
+
+                // Wait until the first connection attempt has taken place
+                // before updating the endpoint messaging version so that the
+                // connection takes place to a 4.0 node.
+                int attempts = 0;
+                final long waitForAttemptMillis = TimeUnit.SECONDS.toMillis(15);
+                while (outbound.connectionAttempts() == 0 && attempts < waitForAttemptMillis / 10)
+                {
+                    Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+                    attempts++;
+                }
+
+                // Now that the connection is being attempted, set the endpoint version so
+                // that on the reconnect attempt the messaging version is rechecked and the
+                // legacy ssl logic picks the storage port instead.  This should trigger a
+                // TRACE level log message "Endpoint version changed from 12 to 10 since
+                // connection initialized, updating."
+                outbound.settings().endpointToVersion.set(endpoint, VERSION_30);
+
+                // The connection should have successfully connected and delivered the _TEST_1
+                // message within the timout.
+                Assert.assertTrue(done.await(15, SECONDS));
+                Assert.assertTrue(outbound.isConnected());
+                Assert.assertTrue(String.format("expect less successful connections (%d) than attempts (%d)",
+                                                outbound.successfulConnections(), outbound.connectionAttempts()),
+                                  outbound.successfulConnections() < outbound.connectionAttempts());
+
+            }
+            finally
+            {
+                outbound.close(false);
+                inbound.close().get(30L, SECONDS);
+                outbound.close(false).get(30L, SECONDS);
+                resetVerbs();
+                MessagingService.instance().messageHandlers.clear();
+            }
+        }
+        finally
+        {
+            MessagingService.instance().versions.set(FBUtilities.getBroadcastAddressAndPort(),
+                                                     current_version);
+            if (originalStoragePort != null)
+                System.setProperty(storagePortProperty, originalStoragePort);
+            else
+                System.clearProperty(storagePortProperty);
+        }
+    }
+
+    @Test
     public void testCloseIfEndpointDown() throws Throwable
     {
         testManual((settings, inbound, outbound, endpoint) -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org