You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jk...@apache.org on 2017/03/22 20:59:28 UTC
[03/10] cassandra git commit: Discard in-flight shadow round responses
Discard in-flight shadow round responses
patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9
Branch: refs/heads/cassandra-3.11
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 +++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
.../apache/cassandra/service/MigrationTask.java | 12 ++--
.../cassandra/service/StorageService.java | 16 +++--
5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
- if (epStateMap.size() > 0)
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound();
+ Gossiper.instance.finishShadowRound(epStateMap);
return; // don't bother doing anything else, we have what we came for
}
+ if (epStateMap.size() > 0)
+ {
+ // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+ // This will prevent Acks from leaking over from the shadow round that are not actual part of
+ // the regular gossip conversation.
+ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+ return;
+ }
+
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
+ // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+ volatile long firstSynSendAt = 0L;
+
public static final long aVeryLongTime = 259200 * 1000; // 3 days
// Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private volatile boolean inShadowRound = false;
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
+ if (firstSynSendAt == 0)
+ firstSynSendAt = System.nanoTime();
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
* Check if this endpoint can safely bootstrap into the cluster.
*
* @param endpoint - the endpoint to check
+ * @param epStates - endpoint states in the cluster
* @return true if the endpoint can join the cluster
*/
- public boolean isSafeForBootstrap(InetAddress endpoint)
+ public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public UUID getHostId(InetAddress endpoint)
{
- return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+ return getHostId(endpoint, endpointStateMap);
+ }
+
+ public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+ {
+ return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
+ return endpointShadowStateMap;
+
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void finishShadowRound()
+ protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
+ {
+ endpointShadowStateMap.putAll(epStateMap);
inShadowRound = false;
+ }
}
protected boolean isInShadowRound()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
}
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Starting shadow gossip round to check for endpoint collision");
if (!MessagingService.instance().isListening())
MessagingService.instance().listen();
- Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
}
}
- Gossiper.instance.resetEndpointStateMap();
}
private boolean allowSimultaneousMoves()