You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/10/23 08:44:32 UTC

[cassandra] branch cassandra-3.0 updated: Avoid marking shutting down nodes as up after receiving gossip shutdown message

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 1260445  Avoid marking shutting down nodes as up after receiving gossip shutdown message
1260445 is described below

commit 12604456f9a8a38f742fd536af145d6ef0f61677
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Oct 6 15:30:48 2020 +0200

    Avoid marking shutting down nodes as up after receiving gossip shutdown message
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-16094
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/service/EchoVerbHandler.java  |  14 ++-
 .../apache/cassandra/service/StorageService.java   |   9 ++
 .../distributed/test/GossipShutdownTest.java       | 135 +++++++++++++++++++++
 4 files changed, 155 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index af7245a..dfb4c4a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.23:
+ * Avoid marking shutting down nodes as up after receiving gossip shutdown message (CASSANDRA-16094)
  * Check SSTables for latest version before dropping compact storage (CASSANDRA-16063)
  * Handle unexpected columns due to schema races (CASSANDRA-15899)
  * Avoid failing compactions with very large partitions (CASSANDRA-15164)
diff --git a/src/java/org/apache/cassandra/service/EchoVerbHandler.java b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
index 3d3f69e..16a5a70 100644
--- a/src/java/org/apache/cassandra/service/EchoVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/EchoVerbHandler.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service;
  * 
  */
 
-
 import org.apache.cassandra.gms.EchoMessage;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
@@ -35,8 +34,15 @@ public class EchoVerbHandler implements IVerbHandler<EchoMessage>
 
     public void doVerb(MessageIn<EchoMessage> message, int id)
     {
-        MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer);
-        logger.trace("Sending a EchoMessage reply {}", message.from);
-        MessagingService.instance().sendReply(echoMessage, id, message.from);
+        if (!StorageService.instance.isShutdown())
+        {
+            logger.trace("Sending a EchoMessage reply {}", message.from);
+            MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.REQUEST_RESPONSE, EchoMessage.instance, EchoMessage.serializer);
+            MessagingService.instance().sendReply(echoMessage, id, message.from);
+        }
+        else
+        {
+            logger.trace("Not sending EchoMessage reply to {} - we are shutdown", message.from);
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 06f0346..1e544b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -155,6 +155,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return isShutdown;
     }
 
+    /**
+     * for in-jvm dtest use - forces isShutdown to be set to whatever passed in.
+     */
+    @VisibleForTesting
+    public void setIsShutdownUnsafeForTests(boolean isShutdown)
+    {
+        this.isShutdown = isShutdown;
+    }
+
     public Collection<Range<Token>> getLocalRanges(String keyspaceName)
     {
         return getRangesForEndpoint(keyspaceName, FBUtilities.getBroadcastAddress());
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipShutdownTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipShutdownTest.java
new file mode 100644
index 0000000..9aebc22
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipShutdownTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class GossipShutdownTest extends TestBaseImpl
+{
+    /**
+     * Makes sure that a node that has shutdown doesn't come back as live (without being restarted)
+     */
+    @Test
+    public void shutdownStayDownTest() throws IOException, InterruptedException, ExecutionException
+    {
+        ExecutorService es = Executors.newSingleThreadExecutor();
+        try (Cluster cluster = init(builder().withNodes(2)
+                                             .withConfig(config -> config.with(GOSSIP)
+                                                                         .with(NETWORK))
+                                             .start()))
+        {
+            cluster.schemaChange("create table "+KEYSPACE+".tbl (id int primary key, v int)");
+
+            for (int i = 0; i < 10; i++)
+                cluster.coordinator(1).execute("insert into "+KEYSPACE+".tbl (id, v) values (?,?)", ConsistencyLevel.ALL, i, i);
+
+            SimpleCondition timeToShutdown = new SimpleCondition();
+            SimpleCondition waitForShutdown = new SimpleCondition();
+            AtomicBoolean signalled = new AtomicBoolean(false);
+            Future f = es.submit(() -> {
+                await(timeToShutdown);
+
+                cluster.get(1).runOnInstance(() -> {
+                    Gossiper.instance.register(new EPChanges());
+                });
+
+                cluster.get(2).runOnInstance(() -> {
+                    StorageService.instance.setIsShutdownUnsafeForTests(true);
+                    Gossiper.instance.stop();
+                });
+                waitForShutdown.signalAll();
+            });
+
+            cluster.filters().outbound().from(2).to(1).verbs(MessagingService.Verb.GOSSIP_DIGEST_SYN.ordinal()).messagesMatching((from, to, message) -> true).drop();
+            cluster.filters().outbound().from(2).to(1).verbs(MessagingService.Verb.GOSSIP_DIGEST_ACK.ordinal()).messagesMatching((from, to, message) ->
+                                                                                                         {
+                                                                                                             if (signalled.compareAndSet(false, true))
+                                                                                                             {
+                                                                                                                 timeToShutdown.signalAll();
+                                                                                                                 await(waitForShutdown);
+                                                                                                                 return false;
+                                                                                                             }
+                                                                                                             return true;
+                                                                                                         }).drop();
+
+            Thread.sleep(10000); // wait for gossip to exchange a few messages
+            f.get();
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
+    private static void await(SimpleCondition sc)
+    {
+        try
+        {
+            sc.await();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static class EPChanges implements IEndpointStateChangeSubscriber, Serializable
+    {
+        private volatile boolean wasDead = false;
+        public void onAlive(InetAddress endpoint, EndpointState state)
+        {
+            if (wasDead)
+                throw new RuntimeException("Node should not go live after it has been dead.");
+        }
+        public void onDead(InetAddress endpoint, EndpointState state)
+        {
+            wasDead = true;
+        }
+
+        public void onRemove(InetAddress endpoint) {}
+        public void onRestart(InetAddress endpoint, EndpointState state) {}
+        public void onJoin(InetAddress endpoint, EndpointState epState) {}
+        public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+        public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+
+    };
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org