You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jk...@apache.org on 2017/03/22 20:59:26 UTC
[01/10] cassandra git commit: Discard in-flight shadow round responses
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 06316df54 -> bf0906b92
refs/heads/cassandra-3.0 f4ba9083e -> 2836a644a
refs/heads/cassandra-3.11 5484bd1ac -> ec9ce3dfb
refs/heads/trunk f5e0a7cdb -> 8b74ae4b6
Discard in-flight shadow round responses
patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9
Branch: refs/heads/cassandra-2.2
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 +++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
.../apache/cassandra/service/MigrationTask.java | 12 ++--
.../cassandra/service/StorageService.java | 16 +++--
5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
- if (epStateMap.size() > 0)
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound();
+ Gossiper.instance.finishShadowRound(epStateMap);
return; // don't bother doing anything else, we have what we came for
}
+ if (epStateMap.size() > 0)
+ {
+ // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+ // This will prevent Acks from leaking over from the shadow round that are not actual part of
+ // the regular gossip conversation.
+ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+ return;
+ }
+
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
+ // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+ volatile long firstSynSendAt = 0L;
+
public static final long aVeryLongTime = 259200 * 1000; // 3 days
// Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private volatile boolean inShadowRound = false;
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
+ if (firstSynSendAt == 0)
+ firstSynSendAt = System.nanoTime();
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
* Check if this endpoint can safely bootstrap into the cluster.
*
* @param endpoint - the endpoint to check
+ * @param epStates - endpoint states in the cluster
* @return true if the endpoint can join the cluster
*/
- public boolean isSafeForBootstrap(InetAddress endpoint)
+ public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public UUID getHostId(InetAddress endpoint)
{
- return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+ return getHostId(endpoint, endpointStateMap);
+ }
+
+ public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+ {
+ return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
+ return endpointShadowStateMap;
+
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void finishShadowRound()
+ protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
+ {
+ endpointShadowStateMap.putAll(epStateMap);
inShadowRound = false;
+ }
}
protected boolean isInShadowRound()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
}
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Starting shadow gossip round to check for endpoint collision");
if (!MessagingService.instance().isListening())
MessagingService.instance().listen();
- Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
}
}
- Gossiper.instance.resetEndpointStateMap();
}
private boolean allowSimultaneousMoves()
[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by jk...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644
Branch: refs/heads/cassandra-3.0
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 ++++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
.../apache/cassandra/service/MigrationTask.java | 12 ++---
.../cassandra/service/StorageService.java | 17 +++---
5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
-2.2.10
+3.0.13
+ * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
+ * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
+ * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
+ * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
+ * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
+Merged from 2.2:
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+
+
+3.0.12
+ * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
+ * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
+ * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2:
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
private volatile boolean inShadowRound = false;
-
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
@@@ -818,28 -826,6 +827,20 @@@
return endpointStateMap.get(ep);
}
+ public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
+ {
+ EndpointState state1 = getEndpointStateForEndpoint(ep1);
+ EndpointState state2 = getEndpointStateForEndpoint(ep2);
+
+ if (state1 == null || state2 == null)
+ return false;
+
+ VersionedValue value1 = state1.getApplicationState(as);
+ VersionedValue value2 = state2.getApplicationState(as);
+
+ return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
+ }
+
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
- // it may be that the local address is the only entry in the seed
- // list in which case, attempting a shadow round is pointless
- if (seeds.isEmpty())
- return endpointShadowStateMap;
-
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
this.endpoint = endpoint;
}
+ public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+ {
+ return inflightTasks;
+ }
+
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@@ -72,16 -63,8 +78,10 @@@
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
{
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
-
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
-
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
if (isReplacingSameAddress())
{
[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by jk...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b74ae4b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b74ae4b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b74ae4b
Branch: refs/heads/trunk
Commit: 8b74ae4b6490e1991603e9365b690da6f6900c10
Parents: f5e0a7c ec9ce3d
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:28:14 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:29:09 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 27 +++--
src/java/org/apache/cassandra/gms/Gossiper.java | 66 +++++++----
.../apache/cassandra/schema/MigrationTask.java | 12 +-
.../cassandra/service/StorageService.java | 17 ++-
test/conf/cassandra-seeds.yaml | 43 +++++++
.../apache/cassandra/gms/ShadowRoundTest.java | 116 +++++++++++++++++++
.../apache/cassandra/net/MatcherResponse.java | 24 ++--
8 files changed, 253 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 50710eb,177d7dc..e5992af
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -1337,12 -1352,23 +1346,24 @@@ public class Gossiper implements IFailu
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Used when preparing to join the ring:
- * * when replacing a node, to get and assume its tokens
- * * when joining, to check that the local host id matches any previous id for the endpoint address
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
++ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
// it may be that the local address is the only entry in the seed
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/MigrationTask.java
index a785e17,0000000..73e396d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@@ -1,113 -1,0 +1,113 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+final class MigrationTask extends WrappedRunnable
+{
+ private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
+
+ private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();
+
+ private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
+
+ private final InetAddress endpoint;
+
+ MigrationTask(InetAddress endpoint)
+ {
+ this.endpoint = endpoint;
+ }
+
+ static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+ {
+ return inflightTasks;
+ }
+
+ public void runMayThrow() throws Exception
+ {
++ if (!FailureDetector.instance.isAlive(endpoint))
++ {
++ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
++ return;
++ }
++
+ // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
+ // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
+ // a higher major.
+ if (!MigrationManager.shouldPullSchemaFrom(endpoint))
+ {
+ logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint);
+ return;
+ }
+
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
+ MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
+ IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
+ {
+ @Override
+ public void response(MessageIn<Collection<Mutation>> message)
+ {
+ try
+ {
+ Schema.instance.mergeAndAnnounceVersion(message.payload);
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Configuration exception merging remote schema", e);
+ }
+ finally
+ {
+ completionLatch.countDown();
+ }
+ }
+
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+ };
+
+ // Only save the latches if we need bootstrap or are bootstrapping
+ if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
+ inflightTasks.offer(completionLatch);
+
+ MessagingService.instance().sendRR(message, endpoint, cb);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by jk...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec9ce3df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec9ce3df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec9ce3df
Branch: refs/heads/trunk
Commit: ec9ce3dfba0030015c5dd846b8b5b526614cf5f7
Parents: 5484bd1 2836a64
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:20:24 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:22:43 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 27 +++--
src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++----
.../apache/cassandra/service/MigrationTask.java | 12 +-
.../cassandra/service/StorageService.java | 17 ++-
test/conf/cassandra-seeds.yaml | 43 +++++++
.../apache/cassandra/gms/ShadowRoundTest.java | 116 +++++++++++++++++++
.../apache/cassandra/net/MatcherResponse.java | 24 ++--
8 files changed, 252 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ce8535d,9140c73..8386c20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -37,143 -49,6 +37,144 @@@ Merged from 3.0
live rows in sstabledump (CASSANDRA-13177)
* Provide user workaround when system_schema.columns does not contain entries
for a table that's in system_schema.tables (CASSANDRA-13180)
+Merged from 2.2:
++ * Discard in-flight shadow round responses (CASSANDRA-12653)
+ * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
+ * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
+ * Fix queries updating multiple time the same list (CASSANDRA-13130)
+ * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+ * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
+ * Fix failing COPY TO STDOUT (CASSANDRA-12497)
+ * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
+ * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Fix negative mean latency metric (CASSANDRA-12876)
+ * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
+Merged from 2.1:
+ * Remove unused repositories (CASSANDRA-13278)
+ * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
+ * Use portable stderr for java error in startup (CASSANDRA-13211)
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
+ * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
+
+
+3.10
+ * Fix secondary index queries regression (CASSANDRA-13013)
+ * Add duration type to the protocol V5 (CASSANDRA-12850)
+ * Fix duration type validation (CASSANDRA-13143)
+ * Fix flaky GcCompactionTest (CASSANDRA-12664)
+ * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
* Dump threads when unit tests time out (CASSANDRA-13117)
* Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
* Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 15662b1,59060f8..d6d9dfb
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -54,16 -54,8 +54,10 @@@ public class GossipDigestAckVerbHandle
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
- logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound(epStateMap);
+ logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);
++
+ // if the ack is completely empty, then we can infer that the respondent is also in a shadow round
- Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty());
++ Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
return; // don't bother doing anything else, we have what we came for
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index ebfd66d,802ff9c..177d7dc
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -30,9 -30,9 +30,10 @@@ import javax.management.ObjectName
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -112,7 -114,7 +116,8 @@@ public class Gossiper implements IFailu
private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
/* initial seeds for joining the cluster */
-- private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
++ @VisibleForTesting
++ final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
/* map where key is the endpoint and value is the state associated with the endpoint */
final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
@@@ -126,7 -128,8 +131,10 @@@
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
private volatile boolean inShadowRound = false;
++ // seeds gathered during shadow round that indicated to be in the shadow round phase as well
+ private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
@@@ -715,22 -720,16 +725,24 @@@
}
/**
- * Check if this endpoint can safely bootstrap into the cluster.
+ * Check if this node can safely be started and join the ring.
+ * If the node is bootstrapping, examines gossip state for any previous status to decide whether
+ * it's safe to allow this node to start and bootstrap. If not bootstrapping, compares the host ID
+ * that the node itself has (obtained by reading from system.local or generated if not present)
+ * with the host ID obtained from gossip for the endpoint address (if any). This latter case
+ * prevents a non-bootstrapping, new node from being started with the same address of a
+ * previously started, but currently down predecessor.
*
* @param endpoint - the endpoint to check
+ * @param localHostUUID - the host id to check
+ * @param isBootstrapping - whether the node intends to bootstrap when joining
+ * @param epStates - endpoint states in the cluster
- * @return true if the endpoint can join the cluster
+ * @return true if it is safe to start the node, false otherwise
*/
- public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping)
- public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
++ public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping,
++ Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
-
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
return true;
@@@ -1343,20 -1327,27 +1352,32 @@@
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Used when preparing to join the ring:
- * * when replacing a node, to get and assume its tokens
- * * when joining, to check that the local host id matches any previous id for the endpoint address
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
- * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
- * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
++ * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
- return;
++ return endpointShadowStateMap;
+
+ seedsInShadowRound.clear();
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@@ -1401,9 -1383,11 +1422,12 @@@
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
-- private void buildSeedsList()
++ @VisibleForTesting
++ void buildSeedsList()
{
for (InetAddress seed : DatabaseDescriptor.getSeeds())
{
@@@ -1521,32 -1505,12 +1545,33 @@@
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound)
- protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
++ protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
{
- endpointShadowStateMap.putAll(epStateMap);
- inShadowRound = false;
+ if (!isInShadowRound)
+ {
+ logger.debug("Received a regular ack from {}, can now exit shadow round", respondent);
+ // respondent sent back a full ack, so we can exit our shadow round
++ endpointShadowStateMap.putAll(epStateMap);
+ inShadowRound = false;
+ seedsInShadowRound.clear();
+ }
+ else
+ {
+ // respondent indicates it too is in a shadow round, if all seeds
+ // are in this state then we can exit our shadow round. Otherwise,
+ // we keep retrying the SR until one responds with a full ACK or
+ // we learn that all seeds are in SR.
+ logger.debug("Received an ack from {} indicating it is also in shadow round", respondent);
+ seedsInShadowRound.add(respondent);
+ if (seedsInShadowRound.containsAll(seeds))
+ {
+ logger.debug("All seeds are in a shadow round, clearing this node to exit its own");
+ inShadowRound = false;
+ seedsInShadowRound.clear();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b64cf13,6760040..3c0bc1a
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -492,76 -474,52 +492,75 @@@ public class StorageService extends Not
daemon.deactivate();
}
- public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
+ private synchronized UUID prepareForReplacement() throws ConfigurationException
{
- logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
- if (!MessagingService.instance().isListening())
- MessagingService.instance().listen();
+ if (SystemKeyspace.bootstrapComplete())
+ throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
+
+ if (!joinRing)
+ throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
- // make magic happen
+ if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
+ throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " +
+ "guarantees as the expected data may not be present until repair is run. " +
+ "To perform this operation, please restart with " +
+ "-Dcassandra.allow_unsafe_replace=true");
+
+ InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
+ logger.info("Gathering node replacement information for {}", replaceAddress);
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
- // now that we've gossiped at least once, we should be able to find the node we're replacing
- if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
- throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
+ // as we've completed the shadow round of gossip, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null)
++ if (epStates.get(replaceAddress) == null)
+ throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
+
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
- VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
++ VersionedValue tokensVersionedValue = epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
- throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress));
- if (isReplacingSameAddress())
- {
- SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
- }
- return tokens;
+ bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
}
catch (IOException e)
{
throw new RuntimeException(e);
}
+
+ UUID localHostId = SystemKeyspace.getLocalHostId();
+
+ if (isReplacingSameAddress())
+ {
- localHostId = Gossiper.instance.getHostId(replaceAddress);
++ localHostId = Gossiper.instance.getHostId(replaceAddress, epStates);
+ SystemKeyspace.setLocalHostId(localHostId); // use the replacee's host Id as our own so we receive hints, etc
+ }
+
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
+ return localHostId;
}
- public synchronized void checkForEndpointCollision() throws ConfigurationException
+ private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException
{
+ if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
+ {
+ logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true");
+ return;
+ }
+
logger.debug("Starting shadow gossip round to check for endpoint collision");
- Gossiper.instance.doShadowRound();
- if (!MessagingService.instance().isListening())
- MessagingService.instance().listen();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
+ // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
+ // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
+ // one, which was either read from system.local or generated at startup. If a learned id is present &
+ // doesn't match the local, then the node needs replacing
- if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap()))
++ if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
FBUtilities.getBroadcastAddress()));
}
- if (useStrictConsistency && !allowSimultaneousMoves())
+
+ if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/conf/cassandra-seeds.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra-seeds.yaml
index 0000000,0000000..02d25d2
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra-seeds.yaml
@@@ -1,0 -1,0 +1,43 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
++#
++cluster_name: Test Cluster
++# memtable_allocation_type: heap_buffers
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++cdc_raw_directory: build/test/cassandra/cdc_raw
++cdc_enabled: false
++hints_directory: build/test/cassandra/hints
++partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++ - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++ - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++ parameters:
++ - seeds: "127.0.0.10,127.0.1.10,127.0.2.10"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++server_encryption_options:
++ internode_encryption: none
++ keystore: conf/.keystore
++ keystore_password: cassandra
++ truststore: conf/.truststore
++ truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true
++enable_scripted_user_defined_functions: true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index 0000000,0000000..f8cc49c
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@@ -1,0 -1,0 +1,116 @@@
++/*
++* Licensed to the Apache Software Foundation (ASF) under one
++* or more contributor license agreements. See the NOTICE file
++* distributed with this work for additional information
++* regarding copyright ownership. The ASF licenses this file
++* to you under the Apache License, Version 2.0 (the
++* "License"); you may not use this file except in compliance
++* with the License. You may obtain a copy of the License at
++*
++* http://www.apache.org/licenses/LICENSE-2.0
++*
++* Unless required by applicable law or agreed to in writing,
++* software distributed under the License is distributed on an
++* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++* KIND, either express or implied. See the License for the
++* specific language governing permissions and limitations
++* under the License.
++*/
++
++package org.apache.cassandra.gms;
++
++import java.util.Collections;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.After;
++import org.junit.BeforeClass;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.IEndpointSnitch;
++import org.apache.cassandra.locator.PropertyFileSnitch;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.MockMessagingService;
++import org.apache.cassandra.net.MockMessagingSpy;
++import org.apache.cassandra.service.StorageService;
++
++import static org.apache.cassandra.net.MockMessagingService.verb;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class ShadowRoundTest
++{
++ private static final Logger logger = LoggerFactory.getLogger(ShadowRoundTest.class);
++
++ @BeforeClass
++ public static void setUp() throws ConfigurationException
++ {
++ System.setProperty("cassandra.config", "cassandra-seeds.yaml");
++
++ DatabaseDescriptor.daemonInitialization();
++ IEndpointSnitch snitch = new PropertyFileSnitch();
++ DatabaseDescriptor.setEndpointSnitch(snitch);
++ Keyspace.setInitialized();
++ }
++
++ @After
++ public void cleanup()
++ {
++ MockMessagingService.cleanup();
++ }
++
++ @Test
++ public void testDelayedResponse()
++ {
++ Gossiper.instance.buildSeedsList();
++ int noOfSeeds = Gossiper.instance.seeds.size();
++
++ final AtomicBoolean ackSend = new AtomicBoolean(false);
++ MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
++ .respondN((msgOut, to) ->
++ {
++ // ACK once to finish shadow round, then busy-spin until gossiper has been enabled
++ // and then reply with remaining ACKs from other seeds
++ if (!ackSend.compareAndSet(false, true))
++ {
++ while (!Gossiper.instance.isEnabled()) ;
++ }
++
++ HeartBeatState hb = new HeartBeatState(123, 456);
++ EndpointState state = new EndpointState(hb);
++ GossipDigestAck payload = new GossipDigestAck(
++ Collections.singletonList(new GossipDigest(to, hb.getGeneration(), hb.getHeartBeatVersion())),
++ Collections.singletonMap(to, state));
++
++ logger.debug("Simulating digest ACK reply");
++ return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
++ }, noOfSeeds);
++
++ // GossipDigestAckVerbHandler will send ack2 for each ack received (after the shadow round)
++ MockMessagingSpy spyAck2 = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_ACK2)).dontReply();
++
++ // Migration request messages should not be emitted during shadow round
++ MockMessagingSpy spyMigrationReq = MockMessagingService.when(verb(MessagingService.Verb.MIGRATION_REQUEST)).dontReply();
++
++ try
++ {
++ StorageService.instance.initServer();
++ }
++ catch (Exception e)
++ {
++ assertEquals("Unable to contact any seeds!", e.getMessage());
++ }
++
++ // we expect one SYN for each seed during shadow round + additional SYNs after gossiper has been enabled
++ assertTrue(spySyn.messagesIntercepted > noOfSeeds);
++
++ // we don't expect to emit any GOSSIP_DIGEST_ACK2 or MIGRATION_REQUEST messages
++ assertEquals(0, spyAck2.messagesIntercepted);
++ assertEquals(0, spyMigrationReq.messagesIntercepted);
++ }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MatcherResponse.java
index 21a75c9,0000000..6cd8085
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@@ -1,208 -1,0 +1,214 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Sends a response for an incoming message with a matching {@link Matcher}.
+ * The actual behavior by any instance of this class can be inspected by
+ * interacting with the returned {@link MockMessagingSpy}.
+ */
+public class MatcherResponse
+{
+ private final Matcher<?> matcher;
+ private final Set<Integer> sendResponses = new HashSet<>();
+ private final MockMessagingSpy spy = new MockMessagingSpy();
+ private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE);
+ private IMessageSink sink;
+
+ MatcherResponse(Matcher<?> matcher)
+ {
+ this.matcher = matcher;
+ }
+
+ /**
+ * Do not create any responses for intercepted outbound messages.
+ */
+ public MockMessagingSpy dontReply()
+ {
+ return respond((MessageIn<?>)null);
+ }
+
+ /**
+ * Respond with provided message in reply to each intercepted outbound message.
+ * @param message the message to use as mock reply from the cluster
+ */
+ public MockMessagingSpy respond(MessageIn<?> message)
+ {
+ return respondN(message, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond a limited number of times with the provided message in reply to each intercepted outbound message.
+ * @param response the message to use as mock reply from the cluster
+ * @param limit number of times to respond with message
+ */
+ public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
+ {
+ return respondN((in, to) -> response, limit);
+ }
+
+ /**
+ * Respond with the message created by the provided function that will be called with each intercepted outbound message.
+ * @param fnResponse function to call for creating reply based on intercepted message and target address
+ */
+ public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
+ {
+ return respondN(fnResponse, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message.
+ * The target address from the intercepted message will automatically be used as the created message's sender address.
+ * @param fnResponse function to call for creating payload object based on intercepted message and target address
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb)
+ {
+ return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond a limited number of times with message wrapping the payload object created by provided function called for
+ * each intercepted outbound message. The target address from the intercepted message will automatically be used as the
+ * created message's sender address.
+ * @param fnResponse function to call for creating payload object based on intercepted message and target address
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
+ {
+ return respondN((MessageOut<T> msg, InetAddress to) -> {
+ S payload = fnResponse.apply(msg);
+ if (payload == null)
+ return null;
+ else
+ return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version);
+ },
+ limit);
+ }
+
+ /**
+ * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
+ * from the provided queue. No reply will be send when the queue has been exhausted.
+ * @param cannedResponses prepared payload messages to use for responses
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb)
+ {
+ return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb);
+ }
+
+ /**
+ * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
+ * from the provided queue. This method will block until queue elements are available.
+ * @param cannedResponses prepared payload messages to use for responses
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb)
+ {
+ return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
+ try
+ {
+ return cannedResponses.take();
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }, verb);
+ }
+
+ /**
+ * Respond a limited number of times with the message created by the provided function that will be called with
+ * each intercepted outbound message.
+ * @param fnResponse function to call for creating reply based on intercepted message and target address
+ */
+ public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
+ {
+ limitCounter.set(limit);
+
+ assert sink == null: "destroy() must be called first to register new response";
+
+ sink = new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ // prevent outgoing message from being send in case matcher indicates a match
+ // and instead send the mocked response
+ if (matcher.matches(message, to))
+ {
+ spy.matchingMessage(message);
+
+ if (limitCounter.decrementAndGet() < 0)
+ return false;
+
+ synchronized (sendResponses)
+ {
+ // I'm not sure about retry semantics regarding message/ID relationships, but I assume
+ // sending a message multiple times using the same ID shouldn't happen..
+ assert !sendResponses.contains(id) : "ID re-use for outgoing message";
+ sendResponses.add(id);
+ }
- MessageIn<?> response = fnResponse.apply(message, to);
- if (response != null)
++
++ // create response asynchronously to match request/response communication execution behavior
++ new Thread(() ->
+ {
- CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
- if (cb != null)
- cb.callback.response(response);
- else
- MessagingService.instance().receive(response, id);
- spy.matchingResponse(response);
- }
++ MessageIn<?> response = fnResponse.apply(message, to);
++ if (response != null)
++ {
++ CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
++ if (cb != null)
++ cb.callback.response(response);
++ else
++ MessagingService.instance().receive(response, id);
++ spy.matchingResponse(response);
++ }
++ }).start();
++
+ return false;
+ }
+ return true;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return true;
+ }
+ };
+ MessagingService.instance().addMessageSink(sink);
+
+ return spy;
+ }
+
+ /**
+ * Stops currently registered response from being send.
+ */
+ public void destroy()
+ {
+ MessagingService.instance().removeMessageSink(sink);
+ }
+}
[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by jk...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644
Branch: refs/heads/trunk
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 ++++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
.../apache/cassandra/service/MigrationTask.java | 12 ++---
.../cassandra/service/StorageService.java | 17 +++---
5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
-2.2.10
+3.0.13
+ * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
+ * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
+ * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
+ * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
+ * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
+Merged from 2.2:
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+
+
+3.0.12
+ * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
+ * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
+ * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2:
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
private volatile boolean inShadowRound = false;
-
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
@@@ -818,28 -826,6 +827,20 @@@
return endpointStateMap.get(ep);
}
+ public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
+ {
+ EndpointState state1 = getEndpointStateForEndpoint(ep1);
+ EndpointState state2 = getEndpointStateForEndpoint(ep2);
+
+ if (state1 == null || state2 == null)
+ return false;
+
+ VersionedValue value1 = state1.getApplicationState(as);
+ VersionedValue value2 = state2.getApplicationState(as);
+
+ return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
+ }
+
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
- // it may be that the local address is the only entry in the seed
- // list in which case, attempting a shadow round is pointless
- if (seeds.isEmpty())
- return endpointShadowStateMap;
-
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
this.endpoint = endpoint;
}
+ public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+ {
+ return inflightTasks;
+ }
+
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@@ -72,16 -63,8 +78,10 @@@
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
{
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
-
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
-
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
if (isReplacingSameAddress())
{
[03/10] cassandra git commit: Discard in-flight shadow round responses
Posted by jk...@apache.org.
Discard in-flight shadow round responses
patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9
Branch: refs/heads/cassandra-3.11
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 +++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
.../apache/cassandra/service/MigrationTask.java | 12 ++--
.../cassandra/service/StorageService.java | 16 +++--
5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
- if (epStateMap.size() > 0)
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound();
+ Gossiper.instance.finishShadowRound(epStateMap);
return; // don't bother doing anything else, we have what we came for
}
+ if (epStateMap.size() > 0)
+ {
+ // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+ // This will prevent Acks from leaking over from the shadow round that are not actual part of
+ // the regular gossip conversation.
+ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+ return;
+ }
+
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
+ // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+ volatile long firstSynSendAt = 0L;
+
public static final long aVeryLongTime = 259200 * 1000; // 3 days
// Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private volatile boolean inShadowRound = false;
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
+ if (firstSynSendAt == 0)
+ firstSynSendAt = System.nanoTime();
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
* Check if this endpoint can safely bootstrap into the cluster.
*
* @param endpoint - the endpoint to check
+ * @param epStates - endpoint states in the cluster
* @return true if the endpoint can join the cluster
*/
- public boolean isSafeForBootstrap(InetAddress endpoint)
+ public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public UUID getHostId(InetAddress endpoint)
{
- return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+ return getHostId(endpoint, endpointStateMap);
+ }
+
+ public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+ {
+ return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
+ return endpointShadowStateMap;
+
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void finishShadowRound()
+ protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
+ {
+ endpointShadowStateMap.putAll(epStateMap);
inShadowRound = false;
+ }
}
protected boolean isInShadowRound()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
}
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Starting shadow gossip round to check for endpoint collision");
if (!MessagingService.instance().isListening())
MessagingService.instance().listen();
- Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
}
}
- Gossiper.instance.resetEndpointStateMap();
}
private boolean allowSimultaneousMoves()
[04/10] cassandra git commit: Discard in-flight shadow round responses
Posted by jk...@apache.org.
Discard in-flight shadow round responses
patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9
Branch: refs/heads/trunk
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 +++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
.../apache/cassandra/service/MigrationTask.java | 12 ++--
.../cassandra/service/StorageService.java | 16 +++--
5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
- if (epStateMap.size() > 0)
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound();
+ Gossiper.instance.finishShadowRound(epStateMap);
return; // don't bother doing anything else, we have what we came for
}
+ if (epStateMap.size() > 0)
+ {
+ // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+ // This will prevent Acks from leaking over from the shadow round that are not actual part of
+ // the regular gossip conversation.
+ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+ return;
+ }
+
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
+ // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+ volatile long firstSynSendAt = 0L;
+
public static final long aVeryLongTime = 259200 * 1000; // 3 days
// Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private volatile boolean inShadowRound = false;
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
+ if (firstSynSendAt == 0)
+ firstSynSendAt = System.nanoTime();
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
* Check if this endpoint can safely bootstrap into the cluster.
*
* @param endpoint - the endpoint to check
+ * @param epStates - endpoint states in the cluster
* @return true if the endpoint can join the cluster
*/
- public boolean isSafeForBootstrap(InetAddress endpoint)
+ public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public UUID getHostId(InetAddress endpoint)
{
- return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+ return getHostId(endpoint, endpointStateMap);
+ }
+
+ public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+ {
+ return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
+ return endpointShadowStateMap;
+
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void finishShadowRound()
+ protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
+ {
+ endpointShadowStateMap.putAll(epStateMap);
inShadowRound = false;
+ }
}
protected boolean isInShadowRound()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
}
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Starting shadow gossip round to check for endpoint collision");
if (!MessagingService.instance().isListening())
MessagingService.instance().listen();
- Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
}
}
- Gossiper.instance.resetEndpointStateMap();
}
private boolean allowSimultaneousMoves()
[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by jk...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644
Branch: refs/heads/cassandra-3.11
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 ++++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
.../apache/cassandra/service/MigrationTask.java | 12 ++---
.../cassandra/service/StorageService.java | 17 +++---
5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
-2.2.10
+3.0.13
+ * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
+ * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
+ * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
+ * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
+ * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
+Merged from 2.2:
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
* Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+
+
+3.0.12
+ * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
+ * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
+ * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
+ * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
+ * Faster StreamingHistogram (CASSANDRA-13038)
+ * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
+ * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
+ * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
+Merged from 2.2:
* Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
* Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
* Coalescing strategy sleeps too much (CASSANDRA-13090)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
private volatile boolean inShadowRound = false;
-
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
@@@ -818,28 -826,6 +827,20 @@@
return endpointStateMap.get(ep);
}
+ public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
+ {
+ EndpointState state1 = getEndpointStateForEndpoint(ep1);
+ EndpointState state2 = getEndpointStateForEndpoint(ep2);
+
+ if (state1 == null || state2 == null)
+ return false;
+
+ VersionedValue value1 = state1.getApplicationState(as);
+ VersionedValue value2 = state2.getApplicationState(as);
+
+ return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
+ }
+
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
- // it may be that the local address is the only entry in the seed
- // list in which case, attempting a shadow round is pointless
- if (seeds.isEmpty())
- return endpointShadowStateMap;
-
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
this.endpoint = endpoint;
}
+ public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+ {
+ return inflightTasks;
+ }
+
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@@ -72,16 -63,8 +78,10 @@@
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
+ final CountDownLatch completionLatch = new CountDownLatch(1);
+
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
{
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
-
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
-
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
if (isReplacingSameAddress())
{
[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by jk...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec9ce3df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec9ce3df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec9ce3df
Branch: refs/heads/cassandra-3.11
Commit: ec9ce3dfba0030015c5dd846b8b5b526614cf5f7
Parents: 5484bd1 2836a64
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:20:24 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:22:43 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 27 +++--
src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++----
.../apache/cassandra/service/MigrationTask.java | 12 +-
.../cassandra/service/StorageService.java | 17 ++-
test/conf/cassandra-seeds.yaml | 43 +++++++
.../apache/cassandra/gms/ShadowRoundTest.java | 116 +++++++++++++++++++
.../apache/cassandra/net/MatcherResponse.java | 24 ++--
8 files changed, 252 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ce8535d,9140c73..8386c20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -37,143 -49,6 +37,144 @@@ Merged from 3.0
live rows in sstabledump (CASSANDRA-13177)
* Provide user workaround when system_schema.columns does not contain entries
for a table that's in system_schema.tables (CASSANDRA-13180)
+Merged from 2.2:
++ * Discard in-flight shadow round responses (CASSANDRA-12653)
+ * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
+ * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
+ * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
+ * Fix queries updating multiple time the same list (CASSANDRA-13130)
+ * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
+ * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
+ * Fix failing COPY TO STDOUT (CASSANDRA-12497)
+ * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
+ * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
+ * Fix negative mean latency metric (CASSANDRA-12876)
+ * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
+Merged from 2.1:
+ * Remove unused repositories (CASSANDRA-13278)
+ * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
+ * Use portable stderr for java error in startup (CASSANDRA-13211)
+ * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
+ * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
+
+
+3.10
+ * Fix secondary index queries regression (CASSANDRA-13013)
+ * Add duration type to the protocol V5 (CASSANDRA-12850)
+ * Fix duration type validation (CASSANDRA-13143)
+ * Fix flaky GcCompactionTest (CASSANDRA-12664)
+ * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
+ * Fixed query monitoring for range queries (CASSANDRA-13050)
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
* Dump threads when unit tests time out (CASSANDRA-13117)
* Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
* Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 15662b1,59060f8..d6d9dfb
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -54,16 -54,8 +54,10 @@@ public class GossipDigestAckVerbHandle
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
- logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound(epStateMap);
+ logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);
++
+ // if the ack is completely empty, then we can infer that the respondent is also in a shadow round
- Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty());
++ Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
return; // don't bother doing anything else, we have what we came for
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index ebfd66d,802ff9c..177d7dc
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -30,9 -30,9 +30,10 @@@ import javax.management.ObjectName
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -112,7 -114,7 +116,8 @@@ public class Gossiper implements IFailu
private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
/* initial seeds for joining the cluster */
-- private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
++ @VisibleForTesting
++ final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
/* map where key is the endpoint and value is the state associated with the endpoint */
final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
@@@ -126,7 -128,8 +131,10 @@@
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
private volatile boolean inShadowRound = false;
++ // seeds gathered during shadow round that indicated to be in the shadow round phase as well
+ private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
@@@ -715,22 -720,16 +725,24 @@@
}
/**
- * Check if this endpoint can safely bootstrap into the cluster.
+ * Check if this node can safely be started and join the ring.
+ * If the node is bootstrapping, examines gossip state for any previous status to decide whether
+ * it's safe to allow this node to start and bootstrap. If not bootstrapping, compares the host ID
+ * that the node itself has (obtained by reading from system.local or generated if not present)
+ * with the host ID obtained from gossip for the endpoint address (if any). This latter case
+ * prevents a non-bootstrapping, new node from being started with the same address of a
+ * previously started, but currently down predecessor.
*
* @param endpoint - the endpoint to check
+ * @param localHostUUID - the host id to check
+ * @param isBootstrapping - whether the node intends to bootstrap when joining
+ * @param epStates - endpoint states in the cluster
- * @return true if the endpoint can join the cluster
+ * @return true if it is safe to start the node, false otherwise
*/
- public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping)
- public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
++ public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping,
++ Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
-
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
return true;
@@@ -1343,20 -1327,27 +1352,32 @@@
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Used when preparing to join the ring:
- * * when replacing a node, to get and assume its tokens
- * * when joining, to check that the local host id matches any previous id for the endpoint address
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
- * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
- * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
++ * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
- return;
++ return endpointShadowStateMap;
+
+ seedsInShadowRound.clear();
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@@ -1401,9 -1383,11 +1422,12 @@@
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
-- private void buildSeedsList()
++ @VisibleForTesting
++ void buildSeedsList()
{
for (InetAddress seed : DatabaseDescriptor.getSeeds())
{
@@@ -1521,32 -1505,12 +1545,33 @@@
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound)
- protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
++ protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
{
- endpointShadowStateMap.putAll(epStateMap);
- inShadowRound = false;
+ if (!isInShadowRound)
+ {
+ logger.debug("Received a regular ack from {}, can now exit shadow round", respondent);
+ // respondent sent back a full ack, so we can exit our shadow round
++ endpointShadowStateMap.putAll(epStateMap);
+ inShadowRound = false;
+ seedsInShadowRound.clear();
+ }
+ else
+ {
+ // respondent indicates it too is in a shadow round, if all seeds
+ // are in this state then we can exit our shadow round. Otherwise,
+ // we keep retrying the SR until one responds with a full ACK or
+ // we learn that all seeds are in SR.
+ logger.debug("Received an ack from {} indicating it is also in shadow round", respondent);
+ seedsInShadowRound.add(respondent);
+ if (seedsInShadowRound.containsAll(seeds))
+ {
+ logger.debug("All seeds are in a shadow round, clearing this node to exit its own");
+ inShadowRound = false;
+ seedsInShadowRound.clear();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b64cf13,6760040..3c0bc1a
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -492,76 -474,52 +492,75 @@@ public class StorageService extends Not
daemon.deactivate();
}
- public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
+ private synchronized UUID prepareForReplacement() throws ConfigurationException
{
- logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
- if (!MessagingService.instance().isListening())
- MessagingService.instance().listen();
+ if (SystemKeyspace.bootstrapComplete())
+ throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
+
+ if (!joinRing)
+ throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
- // make magic happen
+ if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
+ throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " +
+ "guarantees as the expected data may not be present until repair is run. " +
+ "To perform this operation, please restart with " +
+ "-Dcassandra.allow_unsafe_replace=true");
+
+ InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
+ logger.info("Gathering node replacement information for {}", replaceAddress);
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
- // now that we've gossiped at least once, we should be able to find the node we're replacing
- if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
- throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
+ // as we've completed the shadow round of gossip, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null)
++ if (epStates.get(replaceAddress) == null)
+ throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
+
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
- VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
++ VersionedValue tokensVersionedValue = epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
- throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
- Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
+ throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress));
- if (isReplacingSameAddress())
- {
- SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
- }
- return tokens;
+ bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
}
catch (IOException e)
{
throw new RuntimeException(e);
}
+
+ UUID localHostId = SystemKeyspace.getLocalHostId();
+
+ if (isReplacingSameAddress())
+ {
- localHostId = Gossiper.instance.getHostId(replaceAddress);
++ localHostId = Gossiper.instance.getHostId(replaceAddress, epStates);
+ SystemKeyspace.setLocalHostId(localHostId); // use the replacee's host Id as our own so we receive hints, etc
+ }
+
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
+ return localHostId;
}
- public synchronized void checkForEndpointCollision() throws ConfigurationException
+ private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException
{
+ if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
+ {
+ logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true");
+ return;
+ }
+
logger.debug("Starting shadow gossip round to check for endpoint collision");
- Gossiper.instance.doShadowRound();
- if (!MessagingService.instance().isListening())
- MessagingService.instance().listen();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
+ // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
+ // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
+ // one, which was either read from system.local or generated at startup. If a learned id is present &
+ // doesn't match the local, then the node needs replacing
- if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap()))
++ if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
FBUtilities.getBroadcastAddress()));
}
- if (useStrictConsistency && !allowSimultaneousMoves())
+
+ if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/conf/cassandra-seeds.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra-seeds.yaml
index 0000000,0000000..02d25d2
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra-seeds.yaml
@@@ -1,0 -1,0 +1,43 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
++#
++cluster_name: Test Cluster
++# memtable_allocation_type: heap_buffers
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++cdc_raw_directory: build/test/cassandra/cdc_raw
++cdc_enabled: false
++hints_directory: build/test/cassandra/hints
++partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++ - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++ - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++ parameters:
++ - seeds: "127.0.0.10,127.0.1.10,127.0.2.10"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++server_encryption_options:
++ internode_encryption: none
++ keystore: conf/.keystore
++ keystore_password: cassandra
++ truststore: conf/.truststore
++ truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true
++enable_scripted_user_defined_functions: true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index 0000000,0000000..f8cc49c
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@@ -1,0 -1,0 +1,116 @@@
++/*
++* Licensed to the Apache Software Foundation (ASF) under one
++* or more contributor license agreements. See the NOTICE file
++* distributed with this work for additional information
++* regarding copyright ownership. The ASF licenses this file
++* to you under the Apache License, Version 2.0 (the
++* "License"); you may not use this file except in compliance
++* with the License. You may obtain a copy of the License at
++*
++* http://www.apache.org/licenses/LICENSE-2.0
++*
++* Unless required by applicable law or agreed to in writing,
++* software distributed under the License is distributed on an
++* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++* KIND, either express or implied. See the License for the
++* specific language governing permissions and limitations
++* under the License.
++*/
++
++package org.apache.cassandra.gms;
++
++import java.util.Collections;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.After;
++import org.junit.BeforeClass;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.IEndpointSnitch;
++import org.apache.cassandra.locator.PropertyFileSnitch;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.MockMessagingService;
++import org.apache.cassandra.net.MockMessagingSpy;
++import org.apache.cassandra.service.StorageService;
++
++import static org.apache.cassandra.net.MockMessagingService.verb;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class ShadowRoundTest
++{
++ private static final Logger logger = LoggerFactory.getLogger(ShadowRoundTest.class);
++
++ @BeforeClass
++ public static void setUp() throws ConfigurationException
++ {
++ System.setProperty("cassandra.config", "cassandra-seeds.yaml");
++
++ DatabaseDescriptor.daemonInitialization();
++ IEndpointSnitch snitch = new PropertyFileSnitch();
++ DatabaseDescriptor.setEndpointSnitch(snitch);
++ Keyspace.setInitialized();
++ }
++
++ @After
++ public void cleanup()
++ {
++ MockMessagingService.cleanup();
++ }
++
++ @Test
++ public void testDelayedResponse()
++ {
++ Gossiper.instance.buildSeedsList();
++ int noOfSeeds = Gossiper.instance.seeds.size();
++
++ final AtomicBoolean ackSend = new AtomicBoolean(false);
++ MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
++ .respondN((msgOut, to) ->
++ {
++ // ACK once to finish shadow round, then busy-spin until gossiper has been enabled
++ // and then reply with remaining ACKs from other seeds
++ if (!ackSend.compareAndSet(false, true))
++ {
++ while (!Gossiper.instance.isEnabled()) ;
++ }
++
++ HeartBeatState hb = new HeartBeatState(123, 456);
++ EndpointState state = new EndpointState(hb);
++ GossipDigestAck payload = new GossipDigestAck(
++ Collections.singletonList(new GossipDigest(to, hb.getGeneration(), hb.getHeartBeatVersion())),
++ Collections.singletonMap(to, state));
++
++ logger.debug("Simulating digest ACK reply");
++ return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
++ }, noOfSeeds);
++
++ // GossipDigestAckVerbHandler will send ack2 for each ack received (after the shadow round)
++ MockMessagingSpy spyAck2 = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_ACK2)).dontReply();
++
++ // Migration request messages should not be emitted during shadow round
++ MockMessagingSpy spyMigrationReq = MockMessagingService.when(verb(MessagingService.Verb.MIGRATION_REQUEST)).dontReply();
++
++ try
++ {
++ StorageService.instance.initServer();
++ }
++ catch (Exception e)
++ {
++ assertEquals("Unable to contact any seeds!", e.getMessage());
++ }
++
++ // we expect one SYN for each seed during shadow round + additional SYNs after gossiper has been enabled
++ assertTrue(spySyn.messagesIntercepted > noOfSeeds);
++
++ // we don't expect to emit any GOSSIP_DIGEST_ACK2 or MIGRATION_REQUEST messages
++ assertEquals(0, spyAck2.messagesIntercepted);
++ assertEquals(0, spyMigrationReq.messagesIntercepted);
++ }
++}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MatcherResponse.java
index 21a75c9,0000000..6cd8085
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@@ -1,208 -1,0 +1,214 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * Sends a response for an incoming message with a matching {@link Matcher}.
+ * The actual behavior by any instance of this class can be inspected by
+ * interacting with the returned {@link MockMessagingSpy}.
+ */
+public class MatcherResponse
+{
+ private final Matcher<?> matcher;
+ private final Set<Integer> sendResponses = new HashSet<>();
+ private final MockMessagingSpy spy = new MockMessagingSpy();
+ private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE);
+ private IMessageSink sink;
+
+ MatcherResponse(Matcher<?> matcher)
+ {
+ this.matcher = matcher;
+ }
+
+ /**
+ * Do not create any responses for intercepted outbound messages.
+ */
+ public MockMessagingSpy dontReply()
+ {
+ return respond((MessageIn<?>)null);
+ }
+
+ /**
+ * Respond with provided message in reply to each intercepted outbound message.
+ * @param message the message to use as mock reply from the cluster
+ */
+ public MockMessagingSpy respond(MessageIn<?> message)
+ {
+ return respondN(message, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond a limited number of times with the provided message in reply to each intercepted outbound message.
+ * @param response the message to use as mock reply from the cluster
+ * @param limit number of times to respond with message
+ */
+ public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
+ {
+ return respondN((in, to) -> response, limit);
+ }
+
+ /**
+ * Respond with the message created by the provided function that will be called with each intercepted outbound message.
+ * @param fnResponse function to call for creating reply based on intercepted message and target address
+ */
+ public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
+ {
+ return respondN(fnResponse, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message.
+ * The target address from the intercepted message will automatically be used as the created message's sender address.
+ * @param fnResponse function to call for creating payload object based on intercepted message and target address
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb)
+ {
+ return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Respond a limited number of times with message wrapping the payload object created by provided function called for
+ * each intercepted outbound message. The target address from the intercepted message will automatically be used as the
+ * created message's sender address.
+ * @param fnResponse function to call for creating payload object based on intercepted message and target address
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
+ {
+ return respondN((MessageOut<T> msg, InetAddress to) -> {
+ S payload = fnResponse.apply(msg);
+ if (payload == null)
+ return null;
+ else
+ return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version);
+ },
+ limit);
+ }
+
+ /**
+ * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
+ * from the provided queue. No reply will be send when the queue has been exhausted.
+ * @param cannedResponses prepared payload messages to use for responses
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb)
+ {
+ return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb);
+ }
+
+ /**
+ * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
+ * from the provided queue. This method will block until queue elements are available.
+ * @param cannedResponses prepared payload messages to use for responses
+ * @param verb verb to use for reply message
+ */
+ public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb)
+ {
+ return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
+ try
+ {
+ return cannedResponses.take();
+ }
+ catch (InterruptedException e)
+ {
+ return null;
+ }
+ }, verb);
+ }
+
+ /**
+ * Respond a limited number of times with the message created by the provided function that will be called with
+ * each intercepted outbound message.
+ * @param fnResponse function to call for creating reply based on intercepted message and target address
+ */
+ public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
+ {
+ limitCounter.set(limit);
+
+ assert sink == null: "destroy() must be called first to register new response";
+
+ sink = new IMessageSink()
+ {
+ public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
+ {
+ // prevent outgoing message from being send in case matcher indicates a match
+ // and instead send the mocked response
+ if (matcher.matches(message, to))
+ {
+ spy.matchingMessage(message);
+
+ if (limitCounter.decrementAndGet() < 0)
+ return false;
+
+ synchronized (sendResponses)
+ {
+ // I'm not sure about retry semantics regarding message/ID relationships, but I assume
+ // sending a message multiple times using the same ID shouldn't happen..
+ assert !sendResponses.contains(id) : "ID re-use for outgoing message";
+ sendResponses.add(id);
+ }
- MessageIn<?> response = fnResponse.apply(message, to);
- if (response != null)
++
++ // create response asynchronously to match request/response communication execution behavior
++ new Thread(() ->
+ {
- CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
- if (cb != null)
- cb.callback.response(response);
- else
- MessagingService.instance().receive(response, id);
- spy.matchingResponse(response);
- }
++ MessageIn<?> response = fnResponse.apply(message, to);
++ if (response != null)
++ {
++ CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
++ if (cb != null)
++ cb.callback.response(response);
++ else
++ MessagingService.instance().receive(response, id);
++ spy.matchingResponse(response);
++ }
++ }).start();
++
+ return false;
+ }
+ return true;
+ }
+
+ public boolean allowIncomingMessage(MessageIn message, int id)
+ {
+ return true;
+ }
+ };
+ MessagingService.instance().addMessageSink(sink);
+
+ return spy;
+ }
+
+ /**
+ * Stops currently registered response from being send.
+ */
+ public void destroy()
+ {
+ MessagingService.instance().removeMessageSink(sink);
+ }
+}
[02/10] cassandra git commit: Discard in-flight shadow round responses
Posted by jk...@apache.org.
Discard in-flight shadow round responses
patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9
Branch: refs/heads/cassandra-3.0
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../gms/GossipDigestAckVerbHandler.java | 26 +++++---
src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
.../apache/cassandra/service/MigrationTask.java | 12 ++--
.../cassandra/service/StorageService.java | 16 +++--
5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
* Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
* Wrong logger name in AnticompactionTask (CASSANDRA-13343)
* Fix queries updating multiple time the same list (CASSANDRA-13130)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
- if (epStateMap.size() > 0)
- {
- /* Notify the Failure Detector */
- Gossiper.instance.notifyFailureDetector(epStateMap);
- Gossiper.instance.applyStateLocally(epStateMap);
- }
-
if (Gossiper.instance.isInShadowRound())
{
if (logger.isDebugEnabled())
logger.debug("Finishing shadow round with {}", from);
- Gossiper.instance.finishShadowRound();
+ Gossiper.instance.finishShadowRound(epStateMap);
return; // don't bother doing anything else, we have what we came for
}
+ if (epStateMap.size() > 0)
+ {
+ // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+ // This will prevent Acks from leaking over from the shadow round that are not actual part of
+ // the regular gossip conversation.
+ if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+ return;
+ }
+
+ /* Notify the Failure Detector */
+ Gossiper.instance.notifyFailureDetector(epStateMap);
+ Gossiper.instance.applyStateLocally(epStateMap);
+ }
+
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
for (GossipDigest gDigest : gDigestList)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
public static final Gossiper instance = new Gossiper();
+ // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+ volatile long firstSynSendAt = 0L;
+
public static final long aVeryLongTime = 259200 * 1000; // 3 days
// Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private volatile boolean inShadowRound = false;
+ // endpoint states as gathered during shadow round
+ private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
private volatile long lastProcessedMessageAt = System.currentTimeMillis();
private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
InetAddress to = liveEndpoints.get(index);
if (logger.isTraceEnabled())
logger.trace("Sending a GossipDigestSyn to {} ...", to);
+ if (firstSynSendAt == 0)
+ firstSynSendAt = System.nanoTime();
MessagingService.instance().sendOneWay(message, to);
return seeds.contains(to);
}
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
* Check if this endpoint can safely bootstrap into the cluster.
*
* @param endpoint - the endpoint to check
+ * @param epStates - endpoint states in the cluster
* @return true if the endpoint can join the cluster
*/
- public boolean isSafeForBootstrap(InetAddress endpoint)
+ public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
{
- EndpointState epState = endpointStateMap.get(endpoint);
+ EndpointState epState = epStates.get(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- // removes ALL endpoint states; should only be called after shadow gossip
- public void resetEndpointStateMap()
- {
- endpointStateMap.clear();
- unreachableEndpoints.clear();
- liveEndpoints.clear();
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
public UUID getHostId(InetAddress endpoint)
{
- return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+ return getHostId(endpoint, endpointStateMap);
+ }
+
+ public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+ {
+ return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
}
EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
}
/**
- * Do a single 'shadow' round of gossip, where we do not modify any state
- * Only used when replacing a node, to get and assume its states
+ * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+ * map return value, instead of endpointStateMap.
+ *
+ * Used when preparing to join the ring:
+ * <ul>
+ * <li>when replacing a node, to get and assume its tokens</li>
+ * <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+ * </ul>
+ *
+ * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+ * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+ * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+ * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+ * the same time.
+ *
+ * @return endpoint states gathered during shadow round or empty map
*/
- public void doShadowRound()
+ public synchronized Map<InetAddress, EndpointState> doShadowRound()
{
buildSeedsList();
+ // it may be that the local address is the only entry in the seed
+ // list in which case, attempting a shadow round is pointless
+ if (seeds.isEmpty())
+ return endpointShadowStateMap;
+
+ endpointShadowStateMap.clear();
// send a completely empty syn
List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
throw new RuntimeException(wtf);
}
+
+ return ImmutableMap.copyOf(endpointShadowStateMap);
}
private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
- protected void finishShadowRound()
+ protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
+ {
+ endpointShadowStateMap.putAll(epStateMap);
inShadowRound = false;
+ }
}
protected boolean isInShadowRound()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
public void runMayThrow() throws Exception
{
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+ return;
+ }
+
// There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
// potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
// a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
return;
}
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.debug("Can't send schema pull request: node {} is down.", endpoint);
- return;
- }
-
MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
MessagingService.instance().listen();
// make magic happen
- Gossiper.instance.doShadowRound();
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
// now that we've gossiped at least once, we should be able to find the node we're replacing
- if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
try
{
- VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+ VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
if (tokensVersionedValue == null)
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
}
- Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Starting shadow gossip round to check for endpoint collision");
if (!MessagingService.instance().isListening())
MessagingService.instance().listen();
- Gossiper.instance.doShadowRound();
- if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+ Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+ if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
{
throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
"Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
if (useStrictConsistency && !allowSimultaneousMoves())
{
- for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+ for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
{
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
}
}
- Gossiper.instance.resetEndpointStateMap();
}
private boolean allowSimultaneousMoves()