You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2021/04/21 22:29:27 UTC
[cassandra] branch cassandra-3.11 updated: Ignore stale ack in
shadow round
This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new f6d1951 Ignore stale ack in shadow round
f6d1951 is described below
commit f6d19512c4d79f800371da1e54dfe01cae5d894e
Author: Brandon Williams <br...@apache.org>
AuthorDate: Wed Apr 14 13:49:21 2021 -0500
Ignore stale ack in shadow round
Patch by brandonwilliams, samt, and Matt Fleming, reviewed by samt for
CASSANDRA-16588
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 5 ++
src/java/org/apache/cassandra/gms/Gossiper.java | 16 +++-
.../apache/cassandra/service/StorageService.java | 2 +-
.../org/apache/cassandra/gms/ShadowRoundTest.java | 94 ++++++++++++++++++++++
5 files changed, 116 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d5ad726..443c2bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.11
+ * Ignore stale acks received in the shadow round (CASSANDRA-16588)
* Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350)
* Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447)
* Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 674b597..b587635 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -83,6 +83,11 @@ public class EndpointState
return applicationState.get().get(key);
}
+ public boolean containsApplicationState(ApplicationState key)
+ {
+ return applicationState.get().containsKey(key);
+ }
+
public Set<Map.Entry<ApplicationState, VersionedValue>> states()
{
return applicationState.get().entrySet();
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 69f7fee..6bc25b6 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.gms;
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -1701,12 +1700,27 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return anyNodeOn30;
}
+ public boolean sufficientForStartupSafetyCheck(Map<InetAddress, EndpointState> epStateMap)
+ {
+ // it is possible for a previously queued ack to be sent to us when we come back up in shadow
+ EndpointState localState = epStateMap.get(FBUtilities.getBroadcastAddress());
+ // return false if response doesn't contain state necessary for safety check
+ return localState == null || isDeadState(localState) || localState.containsApplicationState(ApplicationState.HOST_ID);
+ }
+
protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
{
if (!isInShadowRound)
{
+ if (!sufficientForStartupSafetyCheck(epStateMap))
+ {
+ logger.debug("Not exiting shadow round because received ACK with insufficient states {} -> {}",
+ FBUtilities.getBroadcastAddress(), epStateMap.get(FBUtilities.getBroadcastAddress()));
+ return;
+ }
+
if (!seeds.contains(respondent))
logger.warn("Received an ack from {}, who isn't a seed. Ensure your seed list includes a live node. Exiting shadow round",
respondent);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index da3a4a8..89af682 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -616,7 +616,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return localHostId;
}
- private synchronized void checkForEndpointCollision(UUID localHostId, Set<InetAddress> peers) throws ConfigurationException
+ public synchronized void checkForEndpointCollision(UUID localHostId, Set<InetAddress> peers) throws ConfigurationException
{
if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
{
diff --git a/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index bc18813..a7368f4 100644
--- a/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@ -19,17 +19,26 @@
package org.apache.cassandra.gms;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.cassandra.dht.IPartitioner;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.PropertyFileSnitch;
@@ -38,6 +47,7 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MockMessagingService;
import org.apache.cassandra.net.MockMessagingSpy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.net.MockMessagingService.verb;
import static org.junit.Assert.assertEquals;
@@ -113,4 +123,88 @@ public class ShadowRoundTest
assertEquals(0, spyAck2.messagesIntercepted());
assertEquals(0, spyMigrationReq.messagesIntercepted());
}
+
+ @Test
+ public void testBadAckInShadow()
+ {
+ final AtomicBoolean ackSend = new AtomicBoolean(false);
+ MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
+ .respondN((msgOut, to) ->
+ {
+ // ACK with bad data in shadow round
+ if (!ackSend.compareAndSet(false, true))
+ {
+ while (!Gossiper.instance.isEnabled()) ;
+ }
+ InetAddress junkaddr;
+ try
+ {
+ junkaddr = InetAddress.getByName("1.1.1.1");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ HeartBeatState hb = new HeartBeatState(123, 456);
+ EndpointState state = new EndpointState(hb);
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ gDigests.add(new GossipDigest(FBUtilities.getBroadcastAddress(), hb.getGeneration(), hb.getHeartBeatVersion()));
+ gDigests.add(new GossipDigest(junkaddr, hb.getGeneration(), hb.getHeartBeatVersion()));
+ Map<InetAddress, EndpointState> smap = new HashMap<InetAddress, EndpointState>()
+ {
+ {
+ put(FBUtilities.getBroadcastAddress(), state);
+ put(junkaddr, state);
+ }
+ };
+ GossipDigestAck payload = new GossipDigestAck(gDigests, smap);
+
+ logger.debug("Simulating bad digest ACK reply");
+ return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
+ }, 1);
+
+ System.setProperty(Config.PROPERTY_PREFIX + "auto_bootstrap", "false");
+ try
+ {
+ StorageService.instance.checkForEndpointCollision(SystemKeyspace.getLocalHostId(), SystemKeyspace.loadHostIds().keySet());
+ }
+ catch (Exception e)
+ {
+ assertEquals("Unable to gossip with any peers", e.getMessage());
+ }
+ System.clearProperty(Config.PROPERTY_PREFIX + "auto_bootstrap");
+ }
+
+ @Test
+ public void testPreviouslyAssassinatedInShadow()
+ {
+ final AtomicBoolean ackSend = new AtomicBoolean(false);
+ MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
+ .respondN((msgOut, to) ->
+ {
+ // ACK with self assassinated in shadow round
+ if (!ackSend.compareAndSet(false, true))
+ {
+ while (!Gossiper.instance.isEnabled()) ;
+ }
+ HeartBeatState hb = new HeartBeatState(123, 456);
+ EndpointState state = new EndpointState(hb);
+ state.addApplicationState(ApplicationState.STATUS,
+ new VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()).left(
+ Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()), 1L));
+ GossipDigestAck payload = new GossipDigestAck(
+ Collections.singletonList(new GossipDigest(FBUtilities.getBroadcastAddress(), hb.getGeneration(), hb.getHeartBeatVersion())),
+ Collections.singletonMap(FBUtilities.getBroadcastAddress(), state));
+
+ logger.debug("Simulating bad digest ACK reply");
+ return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
+ }, 1);
+
+
+ System.setProperty(Config.PROPERTY_PREFIX + "auto_bootstrap", "false");
+ StorageService.instance.checkForEndpointCollision(SystemKeyspace.getLocalHostId(), SystemKeyspace.loadHostIds().keySet());
+ System.clearProperty(Config.PROPERTY_PREFIX + "auto_bootstrap");
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org