You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2021/04/21 22:29:27 UTC

[cassandra] branch cassandra-3.11 updated: Ignore stale ack in shadow round

This is an automated email from the ASF dual-hosted git repository.

brandonwilliams pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new f6d1951  Ignore stale ack in shadow round
f6d1951 is described below

commit f6d19512c4d79f800371da1e54dfe01cae5d894e
Author: Brandon Williams <br...@apache.org>
AuthorDate: Wed Apr 14 13:49:21 2021 -0500

    Ignore stale ack in shadow round
    
    Patch by brandonwilliams, samt, and Matt Fleming, reviewed by samt for
    CASSANDRA-16588
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/gms/EndpointState.java    |  5 ++
 src/java/org/apache/cassandra/gms/Gossiper.java    | 16 +++-
 .../apache/cassandra/service/StorageService.java   |  2 +-
 .../org/apache/cassandra/gms/ShadowRoundTest.java  | 94 ++++++++++++++++++++++
 5 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d5ad726..443c2bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.11
+ * Ignore stale acks received in the shadow round (CASSANDRA-16588)
  * Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350)
  * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447)
  * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 674b597..b587635 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -83,6 +83,11 @@ public class EndpointState
         return applicationState.get().get(key);
     }
 
+    public boolean containsApplicationState(ApplicationState key)
+    {
+        return applicationState.get().containsKey(key);
+    }
+
     public Set<Map.Entry<ApplicationState, VersionedValue>> states()
     {
         return applicationState.get().entrySet();
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 69f7fee..6bc25b6 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.gms;
 
-import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
@@ -1701,12 +1700,27 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return anyNodeOn30;
     }
 
+    public boolean sufficientForStartupSafetyCheck(Map<InetAddress, EndpointState> epStateMap)
+    {
+        // it is possible for a previously queued ack to be sent to us when we come back up in shadow
+        EndpointState localState = epStateMap.get(FBUtilities.getBroadcastAddress());
+        // return false if response doesn't contain state necessary for safety check
+        return localState == null || isDeadState(localState) || localState.containsApplicationState(ApplicationState.HOST_ID);
+    }
+
     protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
         {
             if (!isInShadowRound)
             {
+                if (!sufficientForStartupSafetyCheck(epStateMap))
+                {
+                    logger.debug("Not exiting shadow round because received ACK with insufficient states {} -> {}",
+                                 FBUtilities.getBroadcastAddress(), epStateMap.get(FBUtilities.getBroadcastAddress()));
+                    return;
+                }
+
                 if (!seeds.contains(respondent))
                     logger.warn("Received an ack from {}, who isn't a seed. Ensure your seed list includes a live node. Exiting shadow round",
                                 respondent);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index da3a4a8..89af682 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -616,7 +616,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return localHostId;
     }
 
-    private synchronized void checkForEndpointCollision(UUID localHostId, Set<InetAddress> peers) throws ConfigurationException
+    public synchronized void checkForEndpointCollision(UUID localHostId, Set<InetAddress> peers) throws ConfigurationException
     {
         if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
         {
diff --git a/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index bc18813..a7368f4 100644
--- a/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@ -19,17 +19,26 @@
 
 package org.apache.cassandra.gms;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.cassandra.dht.IPartitioner;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.PropertyFileSnitch;
@@ -38,6 +47,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MockMessagingService;
 import org.apache.cassandra.net.MockMessagingSpy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.net.MockMessagingService.verb;
 import static org.junit.Assert.assertEquals;
@@ -113,4 +123,88 @@ public class ShadowRoundTest
         assertEquals(0, spyAck2.messagesIntercepted());
         assertEquals(0, spyMigrationReq.messagesIntercepted());
     }
+
+    @Test
+    public void testBadAckInShadow()
+    {
+        final AtomicBoolean ackSend = new AtomicBoolean(false);
+        MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
+                .respondN((msgOut, to) ->
+                {
+                    // ACK with bad data in shadow round
+                    if (!ackSend.compareAndSet(false, true))
+                    {
+                        while (!Gossiper.instance.isEnabled()) ;
+                    }
+                    InetAddress junkaddr;
+                    try
+                    {
+                        junkaddr = InetAddress.getByName("1.1.1.1");
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+
+                    HeartBeatState hb = new HeartBeatState(123, 456);
+                    EndpointState state = new EndpointState(hb);
+                    List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+                    gDigests.add(new GossipDigest(FBUtilities.getBroadcastAddress(), hb.getGeneration(), hb.getHeartBeatVersion()));
+                    gDigests.add(new GossipDigest(junkaddr, hb.getGeneration(), hb.getHeartBeatVersion()));
+                    Map<InetAddress, EndpointState> smap = new HashMap<InetAddress, EndpointState>()
+                    {
+                        {
+                            put(FBUtilities.getBroadcastAddress(), state);
+                            put(junkaddr, state);
+                        }
+                    };
+                    GossipDigestAck payload = new GossipDigestAck(gDigests, smap);
+
+                    logger.debug("Simulating bad digest ACK reply");
+                    return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
+                }, 1);
+
+        System.setProperty(Config.PROPERTY_PREFIX + "auto_bootstrap", "false");
+        try
+        {
+            StorageService.instance.checkForEndpointCollision(SystemKeyspace.getLocalHostId(), SystemKeyspace.loadHostIds().keySet());
+        }
+        catch (Exception e)
+        {
+            assertEquals("Unable to gossip with any peers", e.getMessage());
+        }
+        System.clearProperty(Config.PROPERTY_PREFIX + "auto_bootstrap");
+    }
+
+    @Test
+    public void testPreviouslyAssassinatedInShadow()
+    {
+        final AtomicBoolean ackSend = new AtomicBoolean(false);
+        MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
+                .respondN((msgOut, to) ->
+                {
+                    // ACK with self assassinated in shadow round
+                    if (!ackSend.compareAndSet(false, true))
+                    {
+                        while (!Gossiper.instance.isEnabled()) ;
+                    }
+                    HeartBeatState hb = new HeartBeatState(123, 456);
+                    EndpointState state = new EndpointState(hb);
+                    state.addApplicationState(ApplicationState.STATUS,
+                            new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()).left(
+                                    Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()), 1L));
+                    GossipDigestAck payload = new GossipDigestAck(
+                        Collections.singletonList(new GossipDigest(FBUtilities.getBroadcastAddress(), hb.getGeneration(), hb.getHeartBeatVersion())),
+                        Collections.singletonMap(FBUtilities.getBroadcastAddress(), state));
+
+                    logger.debug("Simulating bad digest ACK reply");
+                    return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
+                }, 1);
+
+
+        System.setProperty(Config.PROPERTY_PREFIX + "auto_bootstrap", "false");
+        StorageService.instance.checkForEndpointCollision(SystemKeyspace.getLocalHostId(), SystemKeyspace.loadHostIds().keySet());
+        System.clearProperty(Config.PROPERTY_PREFIX + "auto_bootstrap");
+    }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org