You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/10/23 08:44:32 UTC
[cassandra] branch cassandra-3.0 updated: Avoid marking shutting
down nodes as up after receiving gossip shutdown message
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 1260445 Avoid marking shutting down nodes as up after receiving gossip shutdown message
1260445 is described below
commit 12604456f9a8a38f742fd536af145d6ef0f61677
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Oct 6 15:30:48 2020 +0200
Avoid marking shutting down nodes as up after receiving gossip shutdown message
Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-16094
---
CHANGES.txt | 1 +
.../apache/cassandra/service/EchoVerbHandler.java | 14 ++-
.../apache/cassandra/service/StorageService.java | 9 ++
.../distributed/test/GossipShutdownTest.java | 135 +++++++++++++++++++++
4 files changed, 155 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index af7245a..dfb4c4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.23:
+ * Avoid marking shutting down nodes as up after receiving gossip shutdown message (CASSANDRA-16094)
* Check SSTables for latest version before dropping compact storage (CASSANDRA-16063)
* Handle unexpected columns due to schema races (CASSANDRA-15899)
* Avoid failing compactions with very large partitions (CASSANDRA-15164)
diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
index 3d3f69e..16a5a70 100644
--- a/src/java/org/apache/cassandra/service/EchoVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service;
*
*/
-
import org.apache.cassandra.gms.EchoMessage;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.MessageIn;
@@ -35,8 +34,15 @@ public class EchoVerbHandler implements IVerbHandler<EchoMessage>
public void doVerb(MessageIn<EchoMessage> message, int id)
{
- MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer);
- logger.trace("Sending a EchoMessage reply {}", message.from);
- MessagingService.instance().sendReply(echoMessage, id, message.from);
+ if (!StorageService.instance.isShutdown())
+ {
+ logger.trace("Sending a EchoMessage reply {}", message.from);
+ MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer);
+ MessagingService.instance().sendReply(echoMessage, id, message.from);
+ }
+ else
+ {
+ logger.trace("Not sending EchoMessage reply to {} - we are shutdown", message.from);
+ }
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 06f0346..1e544b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -155,6 +155,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return isShutdown;
}
+ /**
+ * for in-jvm dtest use - forces isShutdown to be set to whatever passed in.
+ */
+ @VisibleForTesting
+ public void setIsShutdownUnsafeForTests(boolean isShutdown)
+ {
+ this.isShutdown = isShutdown;
+ }
+
public Collection<Range<Token>> getLocalRanges(String keyspaceName)
{
return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipShutdownTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipShutdownTest.java
new file mode 100644
index 0000000..9aebc22
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipShutdownTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class GossipShutdownTest extends TestBaseImpl
+{
+ /**
+ * Makes sure that a node that has shutdown doesn't come back as live (without being restarted)
+ */
+ @Test
+ public void shutdownStayDownTest() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try (Cluster cluster = init(builder().withNodes(2)
+ .withConfig(config -> config.with(GOSSIP)
+ .with(NETWORK))
+ .start()))
+ {
+ cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, v int)");
+
+ for (int i = 0; i < 10; i++)
+ cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, v) values (?,?)", ConsistencyLevel.ALL, i, i);
+
+ SimpleCondition timeToShutdown = new SimpleCondition();
+ SimpleCondition waitForShutdown = new SimpleCondition();
+ AtomicBoolean signalled = new AtomicBoolean(false);
+ Future f = es.submit(() -> {
+ await(timeToShutdown);
+
+ cluster.get(1).runOnInstance(() -> {
+ Gossiper.instance.register(new EPChanges());
+ });
+
+ cluster.get(2).runOnInstance(() -> {
+ StorageService.instance.setIsShutdownUnsafeForTests(true);
+ Gossiper.instance.stop();
+ });
+ waitForShutdown.signalAll();
+ });
+
+ cluster.filters().outbound().from(2).to(1).verbs(MessagingService.Verb.GOSSIP_DIGEST_SYN.ordinal()).messagesMatching((from, to, message) -> true).drop();
+ cluster.filters().outbound().from(2).to(1).verbs(MessagingService.Verb.GOSSIP_DIGEST_ACK.ordinal()).messagesMatching((from, to, message) ->
+ {
+ if (signalled.compareAndSet(false, true))
+ {
+ timeToShutdown.signalAll();
+ await(waitForShutdown);
+ return false;
+ }
+ return true;
+ }).drop();
+
+ Thread.sleep(10000); // wait for gossip to exchange a few messages
+ f.get();
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
+ private static void await(SimpleCondition sc)
+ {
+ try
+ {
+ sc.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static class EPChanges implements IEndpointStateChangeSubscriber, Serializable
+ {
+ private volatile boolean wasDead = false;
+ public void onAlive(InetAddress endpoint, EndpointState state)
+ {
+ if (wasDead)
+ throw new RuntimeException("Node should not go live after it has been dead.");
+ }
+ public void onDead(InetAddress endpoint, EndpointState state)
+ {
+ wasDead = true;
+ }
+
+ public void onRemove(InetAddress endpoint) {}
+ public void onRestart(InetAddress endpoint, EndpointState state) {}
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+
+ };
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org