You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2022/06/30 19:44:50 UTC

[cassandra] branch cassandra-4.0 updated: Disable mock internode messaging during startup/shutdown

This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 008bf8607c Disable mock internode messaging during startup/shutdown
008bf8607c is described below

commit 008bf8607c55926d0aadb4602a8a854d396c7657
Author: Jon Meredith <jo...@apache.org>
AuthorDate: Wed Jun 29 12:25:08 2022 -0600

    Disable mock internode messaging during startup/shutdown
    
    patch by Jon Meredith; reviewed by David Capwell for CASSANDRA-17636
---
 .../cassandra/distributed/action/GossipHelper.java | 12 ++++-
 .../distributed/impl/AbstractCluster.java          |  2 +
 .../cassandra/distributed/impl/Instance.java       | 61 +++++++++++++++++++++-
 3 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index a763b45fcb..f4b5838a86 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -87,7 +87,7 @@ public class GossipHelper
      */
     public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer)
     {
-        int messagingVersion = peer.getMessagingVersion();
+        final int messagingVersion = getOrDefaultMessagingVersion(target, peer);
         changeGossipState(target,
                           peer,
                           Arrays.asList(unsafeVersionedValue(target,
@@ -412,7 +412,7 @@ public class GossipHelper
     {
         InetSocketAddress addr = peer.broadcastAddress();
         UUID hostId = peer.config().hostId();
-        int netVersion = peer.getMessagingVersion();
+        final int netVersion = getOrDefaultMessagingVersion(target, peer);
         target.runOnInstance(() -> {
             InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
             StorageService storageService = StorageService.instance;
@@ -438,6 +438,14 @@ public class GossipHelper
         });
     }
 
+    private static int getOrDefaultMessagingVersion(IInvokableInstance target, IInstance peer)
+    {
+        int peerVersion = peer.getMessagingVersion();
+        final int netVersion = peerVersion == 0 ? target.getMessagingVersion() : peerVersion;
+        assert netVersion != 0 : "Unable to determine messaging version for peer {}" + peer.config().num();
+        return netVersion;
+    }
+
     public static void withProperty(String prop, boolean value, Runnable r)
     {
         withProperty(prop, Boolean.toString(value), r);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 240f080c2c..d02a4f76e4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -278,6 +278,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
                 }
                 throw t;
             }
+            // This duplicates work done in Instance startup, but keeping as other Instance implementations
+            // do not, so to permit older releases to be tested, repeat the setup
             updateMessagingVersions();
         }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b2edb4bff1..97dd45ec77 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -45,6 +45,9 @@ import javax.management.NotificationListener;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
@@ -144,8 +147,10 @@ import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
 
 public class Instance extends IsolatedExecutor implements IInvokableInstance
 {
+    private Logger inInstancelogger; // Defer creation until running in the instance context
     public final IInstanceConfig config;
     private volatile boolean initialized = false;
+    private volatile boolean internodeMessagingStarted = false;
     private final AtomicLong startedAt = new AtomicLong();
 
     // should never be invoked directly, so that it is instantiated on other class loader;
@@ -263,6 +268,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     private void registerMockMessaging(ICluster cluster)
     {
         MessagingService.instance().outboundSink.add((message, to) -> {
+            if (!internodeMessagingStarted)
+            {
+                inInstancelogger.debug("Dropping outbound message {} to {} as internode messaging has not been started yet",
+                                       message, to);
+                return false;
+            }
             InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
             IInstance toInstance = cluster.get(toAddr);
             if (toInstance != null)
@@ -403,6 +414,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     @Override
     public void receiveMessageWithInvokingThread(IMessage message)
     {
+        if (!internodeMessagingStarted)
+        {
+            inInstancelogger.debug("Dropping inbound message {} to {} as internode messaging has not been started yet",
+                                   message, config().broadcastAddress());
+            return;
+        }
         if (message.version() > MessagingService.current_version)
         {
             throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
@@ -421,13 +438,20 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
     public int getMessagingVersion()
     {
-        return callsOnInstance(() -> MessagingService.current_version).call();
+        if (DatabaseDescriptor.isDaemonInitialized())
+            return MessagingService.current_version;
+        else
+            return 0;
     }
 
     @Override
     public void setMessagingVersion(InetSocketAddress endpoint, int version)
     {
-        MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
+        if (DatabaseDescriptor.isDaemonInitialized())
+            MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
+        else
+            inInstancelogger.warn("Skipped setting messaging version for {} to {} as daemon not initialized yet. Stacktrace attached for debugging.",
+                                  endpoint, version, new RuntimeException());
     }
 
     @Override
@@ -461,6 +485,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt uninitialized";
 
         sync(() -> {
+            inInstancelogger = LoggerFactory.getLogger(Instance.class);
             try
             {
                 if (config.has(GOSSIP))
@@ -537,6 +562,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                 }
                 registerInboundFilter(cluster);
                 registerOutboundFilter(cluster);
+                if (!config.has(NETWORK))
+                {
+                    propagateMessagingVersions(cluster); // fake messaging needs to know messaging version for filters
+                }
+                internodeMessagingStarted = true;
 
                 JVMStabilityInspector.replaceKiller(new InstanceKiller());
 
@@ -594,6 +624,32 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         initialized = true;
     }
 
+    // Update the messaging versions for all instances
+    // that have initialized their configurations.
+    private static void propagateMessagingVersions(ICluster cluster)
+    {
+        cluster.stream().forEach(reportToObj -> {
+            IInstance reportTo = (IInstance) reportToObj;
+            if (reportTo.isShutdown())
+                return;
+
+            int reportToVersion = reportTo.getMessagingVersion();
+            if (reportToVersion == 0)
+                return;
+
+            cluster.stream().forEach(reportFromObj -> {
+                IInstance reportFrom = (IInstance) reportFromObj;
+                if (reportFrom == reportTo || reportFrom.isShutdown())
+                    return;
+
+                int reportFromVersion = reportFrom.getMessagingVersion();
+                if (reportFromVersion == 0) // has not read configuration yet, no accessing messaging version
+                    return;
+                // TODO: decide if we need to take care of the minversion
+                reportTo.setMessagingVersion(reportFrom.broadcastAddress(), reportFromVersion);
+            });
+        });
+    }
 
     private void mkdirs()
     {
@@ -738,6 +794,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
             error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES));
 
+            internodeMessagingStarted = false;
             error = parallelRun(error, executor,
                                 CommitLog.instance::shutdownBlocking,
                                 // can only shutdown message once, so if the test shutsdown an instance, then ignore the failure


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org