You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2022/06/30 19:44:50 UTC
[cassandra] branch cassandra-4.0 updated: Disable mock internode messaging during startup/shutdown
This is an automated email from the ASF dual-hosted git repository.
jonmeredith pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 008bf8607c Disable mock internode messaging during startup/shutdown
008bf8607c is described below
commit 008bf8607c55926d0aadb4602a8a854d396c7657
Author: Jon Meredith <jo...@apache.org>
AuthorDate: Wed Jun 29 12:25:08 2022 -0600
Disable mock internode messaging during startup/shutdown
patch by Jon Meredith; reviewed by David Capwell for CASSANDRA-17636
---
.../cassandra/distributed/action/GossipHelper.java | 12 ++++-
.../distributed/impl/AbstractCluster.java | 2 +
.../cassandra/distributed/impl/Instance.java | 61 +++++++++++++++++++++-
3 files changed, 71 insertions(+), 4 deletions(-)
diff --git a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
index a763b45fcb..f4b5838a86 100644
--- a/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
+++ b/test/distributed/org/apache/cassandra/distributed/action/GossipHelper.java
@@ -87,7 +87,7 @@ public class GossipHelper
*/
public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer)
{
- int messagingVersion = peer.getMessagingVersion();
+ final int messagingVersion = getOrDefaultMessagingVersion(target, peer);
changeGossipState(target,
peer,
Arrays.asList(unsafeVersionedValue(target,
@@ -412,7 +412,7 @@ public class GossipHelper
{
InetSocketAddress addr = peer.broadcastAddress();
UUID hostId = peer.config().hostId();
- int netVersion = peer.getMessagingVersion();
+ final int netVersion = getOrDefaultMessagingVersion(target, peer);
target.runOnInstance(() -> {
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
StorageService storageService = StorageService.instance;
@@ -438,6 +438,14 @@ public class GossipHelper
});
}
+ private static int getOrDefaultMessagingVersion(IInvokableInstance target, IInstance peer)
+ {
+ int peerVersion = peer.getMessagingVersion();
+ final int netVersion = peerVersion == 0 ? target.getMessagingVersion() : peerVersion;
+ assert netVersion != 0 : "Unable to determine messaging version for peer {}" + peer.config().num();
+ return netVersion;
+ }
+
public static void withProperty(String prop, boolean value, Runnable r)
{
withProperty(prop, Boolean.toString(value), r);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 240f080c2c..d02a4f76e4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -278,6 +278,8 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I
}
throw t;
}
+ // This duplicates work done in Instance startup, but keeping as other Instance implementations
+ // do not, so to permit older releases to be tested, repeat the setup
updateMessagingVersions();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b2edb4bff1..97dd45ec77 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -45,6 +45,9 @@ import javax.management.NotificationListener;
import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
@@ -144,8 +147,10 @@ import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
+ private Logger inInstancelogger; // Defer creation until running in the instance context
public final IInstanceConfig config;
private volatile boolean initialized = false;
+ private volatile boolean internodeMessagingStarted = false;
private final AtomicLong startedAt = new AtomicLong();
// should never be invoked directly, so that it is instantiated on other class loader;
@@ -263,6 +268,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
private void registerMockMessaging(ICluster cluster)
{
MessagingService.instance().outboundSink.add((message, to) -> {
+ if (!internodeMessagingStarted)
+ {
+ inInstancelogger.debug("Dropping outbound message {} to {} as internode messaging has not been started yet",
+ message, to);
+ return false;
+ }
InetSocketAddress toAddr = fromCassandraInetAddressAndPort(to);
IInstance toInstance = cluster.get(toAddr);
if (toInstance != null)
@@ -403,6 +414,12 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
@Override
public void receiveMessageWithInvokingThread(IMessage message)
{
+ if (!internodeMessagingStarted)
+ {
+ inInstancelogger.debug("Dropping inbound message {} to {} as internode messaging has not been started yet",
+ message, config().broadcastAddress());
+ return;
+ }
if (message.version() > MessagingService.current_version)
{
throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
@@ -421,13 +438,20 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
public int getMessagingVersion()
{
- return callsOnInstance(() -> MessagingService.current_version).call();
+ if (DatabaseDescriptor.isDaemonInitialized())
+ return MessagingService.current_version;
+ else
+ return 0;
}
@Override
public void setMessagingVersion(InetSocketAddress endpoint, int version)
{
- MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
+ if (DatabaseDescriptor.isDaemonInitialized())
+ MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
+ else
+ inInstancelogger.warn("Skipped setting messaging version for {} to {} as daemon not initialized yet. Stacktrace attached for debugging.",
+ endpoint, version, new RuntimeException());
}
@Override
@@ -461,6 +485,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt uninitialized";
sync(() -> {
+ inInstancelogger = LoggerFactory.getLogger(Instance.class);
try
{
if (config.has(GOSSIP))
@@ -537,6 +562,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}
registerInboundFilter(cluster);
registerOutboundFilter(cluster);
+ if (!config.has(NETWORK))
+ {
+ propagateMessagingVersions(cluster); // fake messaging needs to know messaging version for filters
+ }
+ internodeMessagingStarted = true;
JVMStabilityInspector.replaceKiller(new InstanceKiller());
@@ -594,6 +624,32 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
initialized = true;
}
+ // Update the messaging versions for all instances
+ // that have initialized their configurations.
+ private static void propagateMessagingVersions(ICluster cluster)
+ {
+ cluster.stream().forEach(reportToObj -> {
+ IInstance reportTo = (IInstance) reportToObj;
+ if (reportTo.isShutdown())
+ return;
+
+ int reportToVersion = reportTo.getMessagingVersion();
+ if (reportToVersion == 0)
+ return;
+
+ cluster.stream().forEach(reportFromObj -> {
+ IInstance reportFrom = (IInstance) reportFromObj;
+ if (reportFrom == reportTo || reportFrom.isShutdown())
+ return;
+
+ int reportFromVersion = reportFrom.getMessagingVersion();
+ if (reportFromVersion == 0) // has not read configuration yet, no accessing messaging version
+ return;
+ // TODO: decide if we need to take care of the minversion
+ reportTo.setMessagingVersion(reportFrom.broadcastAddress(), reportFromVersion);
+ });
+ });
+ }
private void mkdirs()
{
@@ -738,6 +794,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES));
+ internodeMessagingStarted = false;
error = parallelRun(error, executor,
CommitLog.instance::shutdownBlocking,
// can only shutdown message once, so if the test shutsdown an instance, then ignore the failure
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org