You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jk...@apache.org on 2017/03/22 20:59:26 UTC

[01/10] cassandra git commit: Discard in-flight shadow round responses

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 06316df54 -> bf0906b92
  refs/heads/cassandra-3.0 f4ba9083e -> 2836a644a
  refs/heads/cassandra-3.11 5484bd1ac -> ec9ce3dfb
  refs/heads/trunk f5e0a7cdb -> 8b74ae4b6


Discard in-flight shadow round responses

patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9

Branch: refs/heads/cassandra-2.2
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 +++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
 .../apache/cassandra/service/MigrationTask.java | 12 ++--
 .../cassandra/service/StorageService.java       | 16 +++--
 5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
  * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
  * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
  * Fix queries updating multiple time the same list (CASSANDRA-13130)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
 
-        if (epStateMap.size() > 0)
-        {
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(epStateMap);
-            Gossiper.instance.applyStateLocally(epStateMap);
-        }
-
         if (Gossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
                 logger.debug("Finishing shadow round with {}", from);
-            Gossiper.instance.finishShadowRound();
+            Gossiper.instance.finishShadowRound(epStateMap);
             return; // don't bother doing anything else, we have what we came for
         }
 
+        if (epStateMap.size() > 0)
+        {
+            // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+            // This will prevent Acks from leaking over from the shadow round that are not actual part of
+            // the regular gossip conversation.
+            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+                return;
+            }
+
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(epStateMap);
+            Gossiper.instance.applyStateLocally(epStateMap);
+        }
+
         /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
         for (GossipDigest gDigest : gDigestList)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
+    // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+    volatile long firstSynSendAt = 0L;
+
     public static final long aVeryLongTime = 259200 * 1000; // 3 days
 
     // Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private volatile boolean inShadowRound = false;
 
+    // endpoint states as gathered during shadow round
+    private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
     private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
+        if (firstSynSendAt == 0)
+            firstSynSendAt = System.nanoTime();
         MessagingService.instance().sendOneWay(message, to);
         return seeds.contains(to);
     }
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
      * Check if this endpoint can safely bootstrap into the cluster.
      *
      * @param endpoint - the endpoint to check
+     * @param epStates - endpoint states in the cluster
      * @return true if the endpoint can join the cluster
      */
-    public boolean isSafeForBootstrap(InetAddress endpoint)
+    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
     {
-        EndpointState epState = endpointStateMap.get(endpoint);
+        EndpointState epState = epStates.get(endpoint);
 
         // if there's no previous state, or the node was previously removed from the cluster, we're good
         if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    // removes ALL endpoint states; should only be called after shadow gossip
-    public void resetEndpointStateMap()
-    {
-        endpointStateMap.clear();
-        unreachableEndpoints.clear();
-        liveEndpoints.clear();
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public UUID getHostId(InetAddress endpoint)
     {
-        return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+        return getHostId(endpoint, endpointStateMap);
+    }
+
+    public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+    {
+        return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     *  Do a single 'shadow' round of gossip, where we do not modify any state
-     *  Only used when replacing a node, to get and assume its states
+     * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+     * map return value, instead of endpointStateMap.
+     *
+     * Used when preparing to join the ring:
+     * <ul>
+     *     <li>when replacing a node, to get and assume its tokens</li>
+     *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+     * </ul>
+     *
+     * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+     * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+     * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+     * the same time.
+     *
+     * @return endpoint states gathered during shadow round or empty map
      */
-    public void doShadowRound()
+    public synchronized Map<InetAddress, EndpointState> doShadowRound()
     {
         buildSeedsList();
+        // it may be that the local address is the only entry in the seed
+        // list in which case, attempting a shadow round is pointless
+        if (seeds.isEmpty())
+            return endpointShadowStateMap;
+
+        endpointShadowStateMap.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
         GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             throw new RuntimeException(wtf);
         }
+
+        return ImmutableMap.copyOf(endpointShadowStateMap);
     }
 
     private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
-    protected void finishShadowRound()
+    protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
+        {
+            endpointShadowStateMap.putAll(epStateMap);
             inShadowRound = false;
+        }
     }
 
     protected boolean isInShadowRound()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
 
     public void runMayThrow() throws Exception
     {
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+            return;
+        }
+
         // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
         // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
         // a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
             return;
         }
 
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-            return;
-        }
-
         MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
 
         IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             MessagingService.instance().listen();
 
         // make magic happen
-        Gossiper.instance.doShadowRound();
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 
         // now that we've gossiped at least once, we should be able to find the node we're replacing
-        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
         try
         {
-            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
             }
-            Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
         catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.debug("Starting shadow gossip round to check for endpoint collision");
         if (!MessagingService.instance().isListening())
             MessagingService.instance().listen();
-        Gossiper.instance.doShadowRound();
-        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
         {
             throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                      "Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         if (useStrictConsistency && !allowSimultaneousMoves())
         {
-            for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+            for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
             {
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
             }
         }
-        Gossiper.instance.resetEndpointStateMap();
     }
 
     private boolean allowSimultaneousMoves()


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jk...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644

Branch: refs/heads/cassandra-3.0
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 ++++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
 .../apache/cassandra/service/MigrationTask.java | 12 ++---
 .../cassandra/service/StorageService.java       | 17 +++---
 5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
 -2.2.10
 +3.0.13
 + * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
 + * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
 + * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
+  * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
   * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
 -
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -818,28 -826,6 +827,20 @@@
          return endpointStateMap.get(ep);
      }
  
 +    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
 +    {
 +        EndpointState state1 = getEndpointStateForEndpoint(ep1);
 +        EndpointState state2 = getEndpointStateForEndpoint(ep2);
 +
 +        if (state1 == null || state2 == null)
 +            return false;
 +
 +        VersionedValue value1 = state1.getApplicationState(as);
 +        VersionedValue value2 = state2.getApplicationState(as);
 +
 +        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
 +    }
 +
-     // removes ALL endpoint states; should only be called after shadow gossip
-     public void resetEndpointStateMap()
-     {
-         endpointStateMap.clear();
-         unreachableEndpoints.clear();
-         liveEndpoints.clear();
-     }
- 
      public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
      {
          return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Only used when replacing a node, to get and assume its states
+      * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
+      * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+      * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 -        // it may be that the local address is the only entry in the seed
 -        // list in which case, attempting a shadow round is pointless
 -        if (seeds.isEmpty())
 -            return endpointShadowStateMap;
 -
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
          this.endpoint = endpoint;
      }
  
 +    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
 +    {
 +        return inflightTasks;
 +    }
 +
      public void runMayThrow() throws Exception
      {
+         if (!FailureDetector.instance.isAlive(endpoint))
+         {
+             logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+             return;
+         }
+ 
          // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
          // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
          // a higher major.
@@@ -72,16 -63,8 +78,10 @@@
              return;
          }
  
-         if (!FailureDetector.instance.isAlive(endpoint))
-         {
-             logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-             return;
-         }
- 
          MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
  
 +        final CountDownLatch completionLatch = new CountDownLatch(1);
 +
          IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
          {
              @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
              MessagingService.instance().listen();
  
          // make magic happen
-         Gossiper.instance.doShadowRound();
- 
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -
          // now that we've gossiped at least once, we should be able to find the node we're replacing
-         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+         if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
              throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-         replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+         replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
          try
          {
-             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+             VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
              if (isReplacingSameAddress())
              {


[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by jk...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b74ae4b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b74ae4b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b74ae4b

Branch: refs/heads/trunk
Commit: 8b74ae4b6490e1991603e9365b690da6f6900c10
Parents: f5e0a7c ec9ce3d
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:28:14 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:29:09 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../gms/GossipDigestAckVerbHandler.java         |  27 +++--
 src/java/org/apache/cassandra/gms/Gossiper.java |  66 +++++++----
 .../apache/cassandra/schema/MigrationTask.java  |  12 +-
 .../cassandra/service/StorageService.java       |  17 ++-
 test/conf/cassandra-seeds.yaml                  |  43 +++++++
 .../apache/cassandra/gms/ShadowRoundTest.java   | 116 +++++++++++++++++++
 .../apache/cassandra/net/MatcherResponse.java   |  24 ++--
 8 files changed, 253 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 50710eb,177d7dc..e5992af
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -1337,12 -1352,23 +1346,24 @@@ public class Gossiper implements IFailu
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Used when preparing to join the ring:
-      *      * when replacing a node, to get and assume its tokens
-      *      * when joining, to check that the local host id matches any previous id for the endpoint address
+      * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
++     * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+      * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
          // it may be that the local address is the only entry in the seed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/schema/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/MigrationTask.java
index a785e17,0000000..73e396d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/MigrationTask.java
+++ b/src/java/org/apache/cassandra/schema/MigrationTask.java
@@@ -1,113 -1,0 +1,113 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.net.InetAddress;
 +import java.util.Collection;
 +import java.util.EnumSet;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.CountDownLatch;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.net.IAsyncCallback;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +
 +final class MigrationTask extends WrappedRunnable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
 +
 +    private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();
 +
 +    private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);
 +
 +    private final InetAddress endpoint;
 +
 +    MigrationTask(InetAddress endpoint)
 +    {
 +        this.endpoint = endpoint;
 +    }
 +
 +    static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
 +    {
 +        return inflightTasks;
 +    }
 +
 +    public void runMayThrow() throws Exception
 +    {
++        if (!FailureDetector.instance.isAlive(endpoint))
++        {
++            logger.warn("Can't send schema pull request: node {} is down.", endpoint);
++            return;
++        }
++
 +        // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
 +        // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
 +        // a higher major.
 +        if (!MigrationManager.shouldPullSchemaFrom(endpoint))
 +        {
 +            logger.info("Skipped sending a migration request: node {} has a higher major version now.", endpoint);
 +            return;
 +        }
 +
-         if (!FailureDetector.instance.isAlive(endpoint))
-         {
-             logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-             return;
-         }
- 
 +        MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
 +
 +        final CountDownLatch completionLatch = new CountDownLatch(1);
 +
 +        IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
 +        {
 +            @Override
 +            public void response(MessageIn<Collection<Mutation>> message)
 +            {
 +                try
 +                {
 +                    Schema.instance.mergeAndAnnounceVersion(message.payload);
 +                }
 +                catch (ConfigurationException e)
 +                {
 +                    logger.error("Configuration exception merging remote schema", e);
 +                }
 +                finally
 +                {
 +                    completionLatch.countDown();
 +                }
 +            }
 +
 +            public boolean isLatencyForSnitch()
 +            {
 +                return false;
 +            }
 +        };
 +
 +        // Only save the latches if we need bootstrap or are bootstrapping
 +        if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
 +            inflightTasks.offer(completionLatch);
 +
 +        MessagingService.instance().sendRR(message, endpoint, cb);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b74ae4b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by jk...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec9ce3df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec9ce3df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec9ce3df

Branch: refs/heads/trunk
Commit: ec9ce3dfba0030015c5dd846b8b5b526614cf5f7
Parents: 5484bd1 2836a64
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:20:24 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:22:43 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../gms/GossipDigestAckVerbHandler.java         |  27 +++--
 src/java/org/apache/cassandra/gms/Gossiper.java |  65 +++++++----
 .../apache/cassandra/service/MigrationTask.java |  12 +-
 .../cassandra/service/StorageService.java       |  17 ++-
 test/conf/cassandra-seeds.yaml                  |  43 +++++++
 .../apache/cassandra/gms/ShadowRoundTest.java   | 116 +++++++++++++++++++
 .../apache/cassandra/net/MatcherResponse.java   |  24 ++--
 8 files changed, 252 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ce8535d,9140c73..8386c20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -37,143 -49,6 +37,144 @@@ Merged from 3.0
     live rows in sstabledump (CASSANDRA-13177)
   * Provide user workaround when system_schema.columns does not contain entries
     for a table that's in system_schema.tables (CASSANDRA-13180)
 +Merged from 2.2:
++ * Discard in-flight shadow round responses (CASSANDRA-12653)
 + * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
 + * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 + * Fix queries updating multiple time the same list (CASSANDRA-13130)
 + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
 + * Fix failing COPY TO STDOUT (CASSANDRA-12497)
 + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
 + * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
 + * Fix negative mean latency metric (CASSANDRA-12876)
 + * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
 +Merged from 2.1:
 + * Remove unused repositories (CASSANDRA-13278)
 + * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
 + * Use portable stderr for java error in startup (CASSANDRA-13211)
 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
 + * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
 +
 +
 +3.10
 + * Fix secondary index queries regression (CASSANDRA-13013)
 + * Add duration type to the protocol V5 (CASSANDRA-12850)
 + * Fix duration type validation (CASSANDRA-13143)
 + * Fix flaky GcCompactionTest (CASSANDRA-12664)
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
   * Dump threads when unit tests time out (CASSANDRA-13117)
   * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
   * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 15662b1,59060f8..d6d9dfb
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -54,16 -54,8 +54,10 @@@ public class GossipDigestAckVerbHandle
          if (Gossiper.instance.isInShadowRound())
          {
              if (logger.isDebugEnabled())
 -                logger.debug("Finishing shadow round with {}", from);
 -            Gossiper.instance.finishShadowRound(epStateMap);
 +                logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);
++
 +            // if the ack is completely empty, then we can infer that the respondent is also in a shadow round
-             Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty());
++            Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
              return; // don't bother doing anything else, we have what we came for
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index ebfd66d,802ff9c..177d7dc
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -30,9 -30,9 +30,10 @@@ import javax.management.ObjectName
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 +import org.apache.cassandra.utils.CassandraVersion;
  import org.apache.cassandra.utils.Pair;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -112,7 -114,7 +116,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
  
      /* initial seeds for joining the cluster */
--    private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
++    @VisibleForTesting
++    final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
  
      /* map where key is the endpoint and value is the state associated with the endpoint */
      final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
@@@ -126,7 -128,8 +131,10 @@@
      private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
++    // seeds gathered during shadow round that indicated to be in the shadow round phase as well
 +    private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -715,22 -720,16 +725,24 @@@
      }
  
      /**
 -     * Check if this endpoint can safely bootstrap into the cluster.
 +     * Check if this node can safely be started and join the ring.
 +     * If the node is bootstrapping, examines gossip state for any previous status to decide whether
 +     * it's safe to allow this node to start and bootstrap. If not bootstrapping, compares the host ID
 +     * that the node itself has (obtained by reading from system.local or generated if not present)
 +     * with the host ID obtained from gossip for the endpoint address (if any). This latter case
 +     * prevents a non-bootstrapping, new node from being started with the same address of a
 +     * previously started, but currently down predecessor.
       *
       * @param endpoint - the endpoint to check
 +     * @param localHostUUID - the host id to check
 +     * @param isBootstrapping - whether the node intends to bootstrap when joining
+      * @param epStates - endpoint states in the cluster
 -     * @return true if the endpoint can join the cluster
 +     * @return true if it is safe to start the node, false otherwise
       */
-     public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping)
 -    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
++    public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping,
++                                    Map<InetAddress, EndpointState> epStates)
      {
-         EndpointState epState = endpointStateMap.get(endpoint);
+         EndpointState epState = epStates.get(endpoint);
 -
          // if there's no previous state, or the node was previously removed from the cluster, we're good
          if (epState == null || isDeadState(epState))
              return true;
@@@ -1343,20 -1327,27 +1352,32 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Used when preparing to join the ring:
-      *      * when replacing a node, to get and assume its tokens
-      *      * when joining, to check that the local host id matches any previous id for the endpoint address
+      * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
 -     * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
 -     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
++     * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 +        // it may be that the local address is the only entry in the seed
 +        // list in which case, attempting a shadow round is pointless
 +        if (seeds.isEmpty())
-             return;
++            return endpointShadowStateMap;
 +
 +        seedsInShadowRound.clear();
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@@ -1401,9 -1383,11 +1422,12 @@@
          {
              throw new RuntimeException(wtf);
          }
+ 
+         return ImmutableMap.copyOf(endpointShadowStateMap);
      }
  
--    private void buildSeedsList()
++    @VisibleForTesting
++    void buildSeedsList()
      {
          for (InetAddress seed : DatabaseDescriptor.getSeeds())
          {
@@@ -1521,32 -1505,12 +1545,33 @@@
          return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
      }
  
-     protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound)
 -    protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
++    protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
      {
          if (inShadowRound)
          {
 -            endpointShadowStateMap.putAll(epStateMap);
 -            inShadowRound = false;
 +            if (!isInShadowRound)
 +            {
 +                logger.debug("Received a regular ack from {}, can now exit shadow round", respondent);
 +                // respondent sent back a full ack, so we can exit our shadow round
++                endpointShadowStateMap.putAll(epStateMap);
 +                inShadowRound = false;
 +                seedsInShadowRound.clear();
 +            }
 +            else
 +            {
 +                // respondent indicates it too is in a shadow round, if all seeds
 +                // are in this state then we can exit our shadow round. Otherwise,
 +                // we keep retrying the SR until one responds with a full ACK or
 +                // we learn that all seeds are in SR.
 +                logger.debug("Received an ack from {} indicating it is also in shadow round", respondent);
 +                seedsInShadowRound.add(respondent);
 +                if (seedsInShadowRound.containsAll(seeds))
 +                {
 +                    logger.debug("All seeds are in a shadow round, clearing this node to exit its own");
 +                    inShadowRound = false;
 +                    seedsInShadowRound.clear();
 +                }
 +            }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b64cf13,6760040..3c0bc1a
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -492,76 -474,52 +492,75 @@@ public class StorageService extends Not
          daemon.deactivate();
      }
  
 -    public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
 +    private synchronized UUID prepareForReplacement() throws ConfigurationException
      {
 -        logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
 +        if (SystemKeyspace.bootstrapComplete())
 +            throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
 +
 +        if (!joinRing)
 +            throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
  
 -        // make magic happen
 +        if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
 +            throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " +
 +                                       "guarantees as the expected data may not be present until repair is run. " +
 +                                       "To perform this operation, please restart with " +
 +                                       "-Dcassandra.allow_unsafe_replace=true");
 +
 +        InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
 +        logger.info("Gathering node replacement information for {}", replaceAddress);
-         Gossiper.instance.doShadowRound();
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -        // now that we've gossiped at least once, we should be able to find the node we're replacing
 -        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
 -            throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
 -        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
 +        // as we've completed the shadow round of gossip, we should be able to find the node we're replacing
-         if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null)
++        if (epStates.get(replaceAddress) == null)
 +            throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
 +
          try
          {
-             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
 -            VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
++            VersionedValue tokensVersionedValue = epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
 -                throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +                throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress));
  
 -            if (isReplacingSameAddress())
 -            {
 -                SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
 -            }
 -            return tokens;
 +            bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
          }
          catch (IOException e)
          {
              throw new RuntimeException(e);
          }
 +
 +        UUID localHostId = SystemKeyspace.getLocalHostId();
 +
 +        if (isReplacingSameAddress())
 +        {
-             localHostId = Gossiper.instance.getHostId(replaceAddress);
++            localHostId = Gossiper.instance.getHostId(replaceAddress, epStates);
 +            SystemKeyspace.setLocalHostId(localHostId); // use the replacee's host Id as our own so we receive hints, etc
 +        }
 +
-         Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
 +        return localHostId;
      }
  
 -    public synchronized void checkForEndpointCollision() throws ConfigurationException
 +    private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException
      {
 +        if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
 +        {
 +            logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true");
 +            return;
 +        }
 +
          logger.debug("Starting shadow gossip round to check for endpoint collision");
-         Gossiper.instance.doShadowRound();
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
 +        // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
 +        // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
 +        // one, which was either read from system.local or generated at startup. If a learned id is present &
 +        // doesn't match the local, then the node needs replacing
-         if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap()))
++        if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates))
          {
              throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                       "Use cassandra.replace_address if you want to replace this node.",
                                                       FBUtilities.getBroadcastAddress()));
          }
 -        if (useStrictConsistency && !allowSimultaneousMoves())
 +
 +        if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
          {
-             for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+             for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
              {
                  // ignore local node or empty status
                  if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/conf/cassandra-seeds.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra-seeds.yaml
index 0000000,0000000..02d25d2
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra-seeds.yaml
@@@ -1,0 -1,0 +1,43 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
++#
++cluster_name: Test Cluster
++# memtable_allocation_type: heap_buffers
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++cdc_raw_directory: build/test/cassandra/cdc_raw
++cdc_enabled: false
++hints_directory: build/test/cassandra/hints
++partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++    - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++      parameters:
++          - seeds: "127.0.0.10,127.0.1.10,127.0.2.10"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++server_encryption_options:
++    internode_encryption: none
++    keystore: conf/.keystore
++    keystore_password: cassandra
++    truststore: conf/.truststore
++    truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true
++enable_scripted_user_defined_functions: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index 0000000,0000000..f8cc49c
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@@ -1,0 -1,0 +1,116 @@@
++/*
++* Licensed to the Apache Software Foundation (ASF) under one
++* or more contributor license agreements.  See the NOTICE file
++* distributed with this work for additional information
++* regarding copyright ownership.  The ASF licenses this file
++* to you under the Apache License, Version 2.0 (the
++* "License"); you may not use this file except in compliance
++* with the License.  You may obtain a copy of the License at
++*
++*    http://www.apache.org/licenses/LICENSE-2.0
++*
++* Unless required by applicable law or agreed to in writing,
++* software distributed under the License is distributed on an
++* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++* KIND, either express or implied.  See the License for the
++* specific language governing permissions and limitations
++* under the License.
++*/
++
++package org.apache.cassandra.gms;
++
++import java.util.Collections;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.After;
++import org.junit.BeforeClass;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.IEndpointSnitch;
++import org.apache.cassandra.locator.PropertyFileSnitch;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.MockMessagingService;
++import org.apache.cassandra.net.MockMessagingSpy;
++import org.apache.cassandra.service.StorageService;
++
++import static org.apache.cassandra.net.MockMessagingService.verb;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class ShadowRoundTest
++{
++    private static final Logger logger = LoggerFactory.getLogger(ShadowRoundTest.class);
++
++    @BeforeClass
++    public static void setUp() throws ConfigurationException
++    {
++        System.setProperty("cassandra.config", "cassandra-seeds.yaml");
++
++        DatabaseDescriptor.daemonInitialization();
++        IEndpointSnitch snitch = new PropertyFileSnitch();
++        DatabaseDescriptor.setEndpointSnitch(snitch);
++        Keyspace.setInitialized();
++    }
++
++    @After
++    public void cleanup()
++    {
++        MockMessagingService.cleanup();
++    }
++
++    @Test
++    public void testDelayedResponse()
++    {
++        Gossiper.instance.buildSeedsList();
++        int noOfSeeds = Gossiper.instance.seeds.size();
++
++        final AtomicBoolean ackSend = new AtomicBoolean(false);
++        MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
++                .respondN((msgOut, to) ->
++                {
++                    // ACK once to finish shadow round, then busy-spin until gossiper has been enabled
++                    // and then reply with remaining ACKs from other seeds
++                    if (!ackSend.compareAndSet(false, true))
++                    {
++                        while (!Gossiper.instance.isEnabled()) ;
++                    }
++
++                    HeartBeatState hb = new HeartBeatState(123, 456);
++                    EndpointState state = new EndpointState(hb);
++                    GossipDigestAck payload = new GossipDigestAck(
++                            Collections.singletonList(new GossipDigest(to, hb.getGeneration(), hb.getHeartBeatVersion())),
++                            Collections.singletonMap(to, state));
++
++                    logger.debug("Simulating digest ACK reply");
++                    return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
++                }, noOfSeeds);
++
++        // GossipDigestAckVerbHandler will send ack2 for each ack received (after the shadow round)
++        MockMessagingSpy spyAck2 = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_ACK2)).dontReply();
++
++        // Migration request messages should not be emitted during shadow round
++        MockMessagingSpy spyMigrationReq = MockMessagingService.when(verb(MessagingService.Verb.MIGRATION_REQUEST)).dontReply();
++
++        try
++        {
++            StorageService.instance.initServer();
++        }
++        catch (Exception e)
++        {
++            assertEquals("Unable to contact any seeds!", e.getMessage());
++        }
++
++        // we expect one SYN for each seed during shadow round + additional SYNs after gossiper has been enabled
++        assertTrue(spySyn.messagesIntercepted > noOfSeeds);
++
++        // we don't expect to emit any GOSSIP_DIGEST_ACK2 or MIGRATION_REQUEST messages
++        assertEquals(0, spyAck2.messagesIntercepted);
++        assertEquals(0, spyMigrationReq.messagesIntercepted);
++    }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MatcherResponse.java
index 21a75c9,0000000..6cd8085
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@@ -1,208 -1,0 +1,214 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.net.InetAddress;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.BiFunction;
 +import java.util.function.Function;
 +
 +/**
 + * Sends a response for an incoming message with a matching {@link Matcher}.
 + * The actual behavior by any instance of this class can be inspected by
 + * interacting with the returned {@link MockMessagingSpy}.
 + */
 +public class MatcherResponse
 +{
 +    private final Matcher<?> matcher;
 +    private final Set<Integer> sendResponses = new HashSet<>();
 +    private final MockMessagingSpy spy = new MockMessagingSpy();
 +    private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE);
 +    private IMessageSink sink;
 +
 +    MatcherResponse(Matcher<?> matcher)
 +    {
 +        this.matcher = matcher;
 +    }
 +
 +    /**
 +     * Do not create any responses for intercepted outbound messages.
 +     */
 +    public MockMessagingSpy dontReply()
 +    {
 +        return respond((MessageIn<?>)null);
 +    }
 +
 +    /**
 +     * Respond with provided message in reply to each intercepted outbound message.
 +     * @param message   the message to use as mock reply from the cluster
 +     */
 +    public MockMessagingSpy respond(MessageIn<?> message)
 +    {
 +        return respondN(message, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with the provided message in reply to each intercepted outbound message.
 +     * @param response  the message to use as mock reply from the cluster
 +     * @param limit     number of times to respond with message
 +     */
 +    public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
 +    {
 +        return respondN((in, to) -> response, limit);
 +    }
 +
 +    /**
 +     * Respond with the message created by the provided function that will be called with each intercepted outbound message.
 +     * @param fnResponse    function to call for creating reply based on intercepted message and target address
 +     */
 +    public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
 +    {
 +        return respondN(fnResponse, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message.
 +     * The target address from the intercepted message will automatically be used as the created message's sender address.
 +     * @param fnResponse    function to call for creating payload object based on intercepted message and target address
 +     * @param verb          verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb)
 +    {
 +        return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with message wrapping the payload object created by provided function called for
 +     * each intercepted outbound message. The target address from the intercepted message will automatically be used as the
 +     * created message's sender address.
 +     * @param fnResponse    function to call for creating payload object based on intercepted message and target address
 +     * @param verb          verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
 +    {
 +        return respondN((MessageOut<T> msg, InetAddress to) -> {
 +                    S payload = fnResponse.apply(msg);
 +                    if (payload == null)
 +                        return null;
 +                    else
 +                        return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version);
 +                },
 +                limit);
 +    }
 +
 +    /**
 +     * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
 +     * from the provided queue. No reply will be send when the queue has been exhausted.
 +     * @param cannedResponses   prepared payload messages to use for responses
 +     * @param verb              verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb)
 +    {
 +        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb);
 +    }
 +
 +    /**
 +     * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
 +     * from the provided queue. This method will block until queue elements are available.
 +     * @param cannedResponses   prepared payload messages to use for responses
 +     * @param verb              verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb)
 +    {
 +        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
 +            try
 +            {
 +                return cannedResponses.take();
 +            }
 +            catch (InterruptedException e)
 +            {
 +                return null;
 +            }
 +        }, verb);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with the message created by the provided function that will be called with
 +     * each intercepted outbound message.
 +     * @param fnResponse    function to call for creating reply based on intercepted message and target address
 +     */
 +    public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
 +    {
 +        limitCounter.set(limit);
 +
 +        assert sink == null: "destroy() must be called first to register new response";
 +
 +        sink = new IMessageSink()
 +        {
 +            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
 +            {
 +                // prevent outgoing message from being send in case matcher indicates a match
 +                // and instead send the mocked response
 +                if (matcher.matches(message, to))
 +                {
 +                    spy.matchingMessage(message);
 +
 +                    if (limitCounter.decrementAndGet() < 0)
 +                        return false;
 +
 +                    synchronized (sendResponses)
 +                    {
 +                        // I'm not sure about retry semantics regarding message/ID relationships, but I assume
 +                        // sending a message multiple times using the same ID shouldn't happen..
 +                        assert !sendResponses.contains(id) : "ID re-use for outgoing message";
 +                        sendResponses.add(id);
 +                    }
-                     MessageIn<?> response = fnResponse.apply(message, to);
-                     if (response != null)
++
++                    // create response asynchronously to match request/response communication execution behavior
++                    new Thread(() ->
 +                    {
-                         CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
-                         if (cb != null)
-                             cb.callback.response(response);
-                         else
-                             MessagingService.instance().receive(response, id);
-                         spy.matchingResponse(response);
-                     }
++                        MessageIn<?> response = fnResponse.apply(message, to);
++                        if (response != null)
++                        {
++                            CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
++                            if (cb != null)
++                                cb.callback.response(response);
++                            else
++                                MessagingService.instance().receive(response, id);
++                            spy.matchingResponse(response);
++                        }
++                    }).start();
++
 +                    return false;
 +                }
 +                return true;
 +            }
 +
 +            public boolean allowIncomingMessage(MessageIn message, int id)
 +            {
 +                return true;
 +            }
 +        };
 +        MessagingService.instance().addMessageSink(sink);
 +
 +        return spy;
 +    }
 +
 +    /**
 +     * Stops currently registered response from being send.
 +     */
 +    public void destroy()
 +    {
 +        MessagingService.instance().removeMessageSink(sink);
 +    }
 +}


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jk...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644

Branch: refs/heads/trunk
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 ++++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
 .../apache/cassandra/service/MigrationTask.java | 12 ++---
 .../cassandra/service/StorageService.java       | 17 +++---
 5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
 -2.2.10
 +3.0.13
 + * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
 + * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
 + * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
+  * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
   * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
 -
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -818,28 -826,6 +827,20 @@@
          return endpointStateMap.get(ep);
      }
  
 +    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
 +    {
 +        EndpointState state1 = getEndpointStateForEndpoint(ep1);
 +        EndpointState state2 = getEndpointStateForEndpoint(ep2);
 +
 +        if (state1 == null || state2 == null)
 +            return false;
 +
 +        VersionedValue value1 = state1.getApplicationState(as);
 +        VersionedValue value2 = state2.getApplicationState(as);
 +
 +        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
 +    }
 +
-     // removes ALL endpoint states; should only be called after shadow gossip
-     public void resetEndpointStateMap()
-     {
-         endpointStateMap.clear();
-         unreachableEndpoints.clear();
-         liveEndpoints.clear();
-     }
- 
      public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
      {
          return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Only used when replacing a node, to get and assume its states
+      * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
+      * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+      * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 -        // it may be that the local address is the only entry in the seed
 -        // list in which case, attempting a shadow round is pointless
 -        if (seeds.isEmpty())
 -            return endpointShadowStateMap;
 -
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
          this.endpoint = endpoint;
      }
  
 +    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
 +    {
 +        return inflightTasks;
 +    }
 +
      public void runMayThrow() throws Exception
      {
+         if (!FailureDetector.instance.isAlive(endpoint))
+         {
+             logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+             return;
+         }
+ 
          // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
          // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
          // a higher major.
@@@ -72,16 -63,8 +78,10 @@@
              return;
          }
  
-         if (!FailureDetector.instance.isAlive(endpoint))
-         {
-             logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-             return;
-         }
- 
          MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
  
 +        final CountDownLatch completionLatch = new CountDownLatch(1);
 +
          IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
          {
              @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
              MessagingService.instance().listen();
  
          // make magic happen
-         Gossiper.instance.doShadowRound();
- 
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -
          // now that we've gossiped at least once, we should be able to find the node we're replacing
-         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+         if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
              throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-         replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+         replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
          try
          {
-             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+             VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
              if (isReplacingSameAddress())
              {


[03/10] cassandra git commit: Discard in-flight shadow round responses

Posted by jk...@apache.org.
Discard in-flight shadow round responses

patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9

Branch: refs/heads/cassandra-3.11
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 +++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
 .../apache/cassandra/service/MigrationTask.java | 12 ++--
 .../cassandra/service/StorageService.java       | 16 +++--
 5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
  * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
  * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
  * Fix queries updating multiple time the same list (CASSANDRA-13130)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
 
-        if (epStateMap.size() > 0)
-        {
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(epStateMap);
-            Gossiper.instance.applyStateLocally(epStateMap);
-        }
-
         if (Gossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
                 logger.debug("Finishing shadow round with {}", from);
-            Gossiper.instance.finishShadowRound();
+            Gossiper.instance.finishShadowRound(epStateMap);
             return; // don't bother doing anything else, we have what we came for
         }
 
+        if (epStateMap.size() > 0)
+        {
+            // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+            // This will prevent Acks from leaking over from the shadow round that are not actual part of
+            // the regular gossip conversation.
+            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+                return;
+            }
+
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(epStateMap);
+            Gossiper.instance.applyStateLocally(epStateMap);
+        }
+
         /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
         for (GossipDigest gDigest : gDigestList)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
+    // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+    volatile long firstSynSendAt = 0L;
+
     public static final long aVeryLongTime = 259200 * 1000; // 3 days
 
     // Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private volatile boolean inShadowRound = false;
 
+    // endpoint states as gathered during shadow round
+    private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
     private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
+        if (firstSynSendAt == 0)
+            firstSynSendAt = System.nanoTime();
         MessagingService.instance().sendOneWay(message, to);
         return seeds.contains(to);
     }
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
      * Check if this endpoint can safely bootstrap into the cluster.
      *
      * @param endpoint - the endpoint to check
+     * @param epStates - endpoint states in the cluster
      * @return true if the endpoint can join the cluster
      */
-    public boolean isSafeForBootstrap(InetAddress endpoint)
+    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
     {
-        EndpointState epState = endpointStateMap.get(endpoint);
+        EndpointState epState = epStates.get(endpoint);
 
         // if there's no previous state, or the node was previously removed from the cluster, we're good
         if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    // removes ALL endpoint states; should only be called after shadow gossip
-    public void resetEndpointStateMap()
-    {
-        endpointStateMap.clear();
-        unreachableEndpoints.clear();
-        liveEndpoints.clear();
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public UUID getHostId(InetAddress endpoint)
     {
-        return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+        return getHostId(endpoint, endpointStateMap);
+    }
+
+    public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+    {
+        return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     *  Do a single 'shadow' round of gossip, where we do not modify any state
-     *  Only used when replacing a node, to get and assume its states
+     * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+     * map return value, instead of endpointStateMap.
+     *
+     * Used when preparing to join the ring:
+     * <ul>
+     *     <li>when replacing a node, to get and assume its tokens</li>
+     *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+     * </ul>
+     *
+     * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+     * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+     * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+     * the same time.
+     *
+     * @return endpoint states gathered during shadow round or empty map
      */
-    public void doShadowRound()
+    public synchronized Map<InetAddress, EndpointState> doShadowRound()
     {
         buildSeedsList();
+        // it may be that the local address is the only entry in the seed
+        // list in which case, attempting a shadow round is pointless
+        if (seeds.isEmpty())
+            return endpointShadowStateMap;
+
+        endpointShadowStateMap.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
         GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             throw new RuntimeException(wtf);
         }
+
+        return ImmutableMap.copyOf(endpointShadowStateMap);
     }
 
     private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
-    protected void finishShadowRound()
+    protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
+        {
+            endpointShadowStateMap.putAll(epStateMap);
             inShadowRound = false;
+        }
     }
 
     protected boolean isInShadowRound()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
 
     public void runMayThrow() throws Exception
     {
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+            return;
+        }
+
         // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
         // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
         // a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
             return;
         }
 
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-            return;
-        }
-
         MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
 
         IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             MessagingService.instance().listen();
 
         // make magic happen
-        Gossiper.instance.doShadowRound();
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 
         // now that we've gossiped at least once, we should be able to find the node we're replacing
-        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
         try
         {
-            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
             }
-            Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
         catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.debug("Starting shadow gossip round to check for endpoint collision");
         if (!MessagingService.instance().isListening())
             MessagingService.instance().listen();
-        Gossiper.instance.doShadowRound();
-        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
         {
             throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                      "Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         if (useStrictConsistency && !allowSimultaneousMoves())
         {
-            for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+            for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
             {
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
             }
         }
-        Gossiper.instance.resetEndpointStateMap();
     }
 
     private boolean allowSimultaneousMoves()


[04/10] cassandra git commit: Discard in-flight shadow round responses

Posted by jk...@apache.org.
Discard in-flight shadow round responses

patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9

Branch: refs/heads/trunk
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 +++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
 .../apache/cassandra/service/MigrationTask.java | 12 ++--
 .../cassandra/service/StorageService.java       | 16 +++--
 5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
  * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
  * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
  * Fix queries updating multiple time the same list (CASSANDRA-13130)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
 
-        if (epStateMap.size() > 0)
-        {
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(epStateMap);
-            Gossiper.instance.applyStateLocally(epStateMap);
-        }
-
         if (Gossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
                 logger.debug("Finishing shadow round with {}", from);
-            Gossiper.instance.finishShadowRound();
+            Gossiper.instance.finishShadowRound(epStateMap);
             return; // don't bother doing anything else, we have what we came for
         }
 
+        if (epStateMap.size() > 0)
+        {
+            // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+            // This will prevent Acks from leaking over from the shadow round that are not actual part of
+            // the regular gossip conversation.
+            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+                return;
+            }
+
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(epStateMap);
+            Gossiper.instance.applyStateLocally(epStateMap);
+        }
+
         /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
         for (GossipDigest gDigest : gDigestList)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
+    // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+    volatile long firstSynSendAt = 0L;
+
     public static final long aVeryLongTime = 259200 * 1000; // 3 days
 
     // Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private volatile boolean inShadowRound = false;
 
+    // endpoint states as gathered during shadow round
+    private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
     private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
+        if (firstSynSendAt == 0)
+            firstSynSendAt = System.nanoTime();
         MessagingService.instance().sendOneWay(message, to);
         return seeds.contains(to);
     }
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
      * Check if this endpoint can safely bootstrap into the cluster.
      *
      * @param endpoint - the endpoint to check
+     * @param epStates - endpoint states in the cluster
      * @return true if the endpoint can join the cluster
      */
-    public boolean isSafeForBootstrap(InetAddress endpoint)
+    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
     {
-        EndpointState epState = endpointStateMap.get(endpoint);
+        EndpointState epState = epStates.get(endpoint);
 
         // if there's no previous state, or the node was previously removed from the cluster, we're good
         if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    // removes ALL endpoint states; should only be called after shadow gossip
-    public void resetEndpointStateMap()
-    {
-        endpointStateMap.clear();
-        unreachableEndpoints.clear();
-        liveEndpoints.clear();
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public UUID getHostId(InetAddress endpoint)
     {
-        return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+        return getHostId(endpoint, endpointStateMap);
+    }
+
+    public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+    {
+        return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     *  Do a single 'shadow' round of gossip, where we do not modify any state
-     *  Only used when replacing a node, to get and assume its states
+     * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+     * map return value, instead of endpointStateMap.
+     *
+     * Used when preparing to join the ring:
+     * <ul>
+     *     <li>when replacing a node, to get and assume its tokens</li>
+     *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+     * </ul>
+     *
+     * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+     * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+     * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+     * the same time.
+     *
+     * @return endpoint states gathered during shadow round or empty map
      */
-    public void doShadowRound()
+    public synchronized Map<InetAddress, EndpointState> doShadowRound()
     {
         buildSeedsList();
+        // it may be that the local address is the only entry in the seed
+        // list in which case, attempting a shadow round is pointless
+        if (seeds.isEmpty())
+            return endpointShadowStateMap;
+
+        endpointShadowStateMap.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
         GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             throw new RuntimeException(wtf);
         }
+
+        return ImmutableMap.copyOf(endpointShadowStateMap);
     }
 
     private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
-    protected void finishShadowRound()
+    protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
+        {
+            endpointShadowStateMap.putAll(epStateMap);
             inShadowRound = false;
+        }
     }
 
     protected boolean isInShadowRound()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
 
     public void runMayThrow() throws Exception
     {
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+            return;
+        }
+
         // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
         // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
         // a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
             return;
         }
 
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-            return;
-        }
-
         MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
 
         IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             MessagingService.instance().listen();
 
         // make magic happen
-        Gossiper.instance.doShadowRound();
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 
         // now that we've gossiped at least once, we should be able to find the node we're replacing
-        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
         try
         {
-            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
             }
-            Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
         catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.debug("Starting shadow gossip round to check for endpoint collision");
         if (!MessagingService.instance().isListening())
             MessagingService.instance().listen();
-        Gossiper.instance.doShadowRound();
-        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
         {
             throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                      "Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         if (useStrictConsistency && !allowSimultaneousMoves())
         {
-            for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+            for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
             {
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
             }
         }
-        Gossiper.instance.resetEndpointStateMap();
     }
 
     private boolean allowSimultaneousMoves()


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by jk...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2836a644
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2836a644
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2836a644

Branch: refs/heads/cassandra-3.11
Commit: 2836a644a357c0992ba89622f04668422ce2761a
Parents: f4ba908 bf0906b
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:13:44 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:18:59 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 ++++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 56 ++++++++++++++------
 .../apache/cassandra/service/MigrationTask.java | 12 ++---
 .../cassandra/service/StorageService.java       | 17 +++---
 5 files changed, 73 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6021315,df2421d..9140c73
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,9 +1,28 @@@
 -2.2.10
 +3.0.13
 + * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
 + * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
 + * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 +Merged from 2.2:
+  * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
   * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
   * Fix queries updating multiple time the same list (CASSANDRA-13130)
   * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 +
 +
 +3.0.12
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Update c.yaml doc for offheap memtables (CASSANDRA-13179)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 +Merged from 2.2:
   * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886)
   * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
   * Coalescing strategy sleeps too much (CASSANDRA-13090)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index cbfa750,c2eccba..802ff9c
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -124,6 -128,9 +128,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
 -
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -818,28 -826,6 +827,20 @@@
          return endpointStateMap.get(ep);
      }
  
 +    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
 +    {
 +        EndpointState state1 = getEndpointStateForEndpoint(ep1);
 +        EndpointState state2 = getEndpointStateForEndpoint(ep2);
 +
 +        if (state1 == null || state2 == null)
 +            return false;
 +
 +        VersionedValue value1 = state1.getApplicationState(as);
 +        VersionedValue value2 = state2.getApplicationState(as);
 +
 +        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
 +    }
 +
-     // removes ALL endpoint states; should only be called after shadow gossip
-     public void resetEndpointStateMap()
-     {
-         endpointStateMap.clear();
-         unreachableEndpoints.clear();
-         liveEndpoints.clear();
-     }
- 
      public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
      {
          return endpointStateMap.entrySet();
@@@ -1321,12 -1312,32 +1327,27 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Only used when replacing a node, to get and assume its states
+      * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
+      * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+      * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 -        // it may be that the local address is the only entry in the seed
 -        // list in which case, attempting a shadow round is pointless
 -        if (seeds.isEmpty())
 -            return endpointShadowStateMap;
 -
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationTask.java
index 39a5a11,b065d90..6b04756
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@@ -56,13 -46,14 +56,19 @@@ class MigrationTask extends WrappedRunn
          this.endpoint = endpoint;
      }
  
 +    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
 +    {
 +        return inflightTasks;
 +    }
 +
      public void runMayThrow() throws Exception
      {
+         if (!FailureDetector.instance.isAlive(endpoint))
+         {
+             logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+             return;
+         }
+ 
          // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
          // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
          // a higher major.
@@@ -72,16 -63,8 +78,10 @@@
              return;
          }
  
-         if (!FailureDetector.instance.isAlive(endpoint))
-         {
-             logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-             return;
-         }
- 
          MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
  
 +        final CountDownLatch completionLatch = new CountDownLatch(1);
 +
          IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
          {
              @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2836a644/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 35b2423,65f536b..6760040
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -481,18 -443,18 +481,17 @@@ public class StorageService extends Not
              MessagingService.instance().listen();
  
          // make magic happen
-         Gossiper.instance.doShadowRound();
- 
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -
          // now that we've gossiped at least once, we should be able to find the node we're replacing
-         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+         if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
              throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-         replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+         replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
          try
          {
-             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+             VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
              if (isReplacingSameAddress())
              {


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by jk...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec9ce3df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec9ce3df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec9ce3df

Branch: refs/heads/cassandra-3.11
Commit: ec9ce3dfba0030015c5dd846b8b5b526614cf5f7
Parents: 5484bd1 2836a64
Author: Joel Knighton <jk...@apache.org>
Authored: Wed Mar 22 13:20:24 2017 -0500
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:22:43 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../gms/GossipDigestAckVerbHandler.java         |  27 +++--
 src/java/org/apache/cassandra/gms/Gossiper.java |  65 +++++++----
 .../apache/cassandra/service/MigrationTask.java |  12 +-
 .../cassandra/service/StorageService.java       |  17 ++-
 test/conf/cassandra-seeds.yaml                  |  43 +++++++
 .../apache/cassandra/gms/ShadowRoundTest.java   | 116 +++++++++++++++++++
 .../apache/cassandra/net/MatcherResponse.java   |  24 ++--
 8 files changed, 252 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ce8535d,9140c73..8386c20
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -37,143 -49,6 +37,144 @@@ Merged from 3.0
     live rows in sstabledump (CASSANDRA-13177)
   * Provide user workaround when system_schema.columns does not contain entries
     for a table that's in system_schema.tables (CASSANDRA-13180)
 +Merged from 2.2:
++ * Discard in-flight shadow round responses (CASSANDRA-12653)
 + * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
 + * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
 + * Commitlog replay may fail if last mutation is within 4 bytes of end of segment (CASSANDRA-13282)
 + * Fix queries updating multiple time the same list (CASSANDRA-13130)
 + * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053)
 + * Fix flaky LongLeveledCompactionStrategyTest (CASSANDRA-12202)
 + * Fix failing COPY TO STDOUT (CASSANDRA-12497)
 + * Fix ColumnCounter::countAll behaviour for reverse queries (CASSANDRA-13222)
 + * Exceptions encountered calling getSeeds() breaks OTC thread (CASSANDRA-13018)
 + * Fix negative mean latency metric (CASSANDRA-12876)
 + * Use only one file pointer when creating commitlog segments (CASSANDRA-12539)
 +Merged from 2.1:
 + * Remove unused repositories (CASSANDRA-13278)
 + * Log stacktrace of uncaught exceptions (CASSANDRA-13108)
 + * Use portable stderr for java error in startup (CASSANDRA-13211)
 + * Fix Thread Leak in OutboundTcpConnection (CASSANDRA-13204)
 + * Coalescing strategy can enter infinite loop (CASSANDRA-13159)
 +
 +
 +3.10
 + * Fix secondary index queries regression (CASSANDRA-13013)
 + * Add duration type to the protocol V5 (CASSANDRA-12850)
 + * Fix duration type validation (CASSANDRA-13143)
 + * Fix flaky GcCompactionTest (CASSANDRA-12664)
 + * Fix TestHintedHandoff.hintedhandoff_decom_test (CASSANDRA-13058)
 + * Fixed query monitoring for range queries (CASSANDRA-13050)
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
   * Dump threads when unit tests time out (CASSANDRA-13117)
   * Better error when modifying function permissions without explicit keyspace (CASSANDRA-12925)
   * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 15662b1,59060f8..d6d9dfb
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@@ -54,16 -54,8 +54,10 @@@ public class GossipDigestAckVerbHandle
          if (Gossiper.instance.isInShadowRound())
          {
              if (logger.isDebugEnabled())
 -                logger.debug("Finishing shadow round with {}", from);
 -            Gossiper.instance.finishShadowRound(epStateMap);
 +                logger.debug("Received an ack from {}, which may trigger exit from shadow round", from);
++
 +            // if the ack is completely empty, then we can infer that the respondent is also in a shadow round
-             Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty());
++            Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() && epStateMap.isEmpty(), epStateMap);
              return; // don't bother doing anything else, we have what we came for
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index ebfd66d,802ff9c..177d7dc
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -30,9 -30,9 +30,10 @@@ import javax.management.ObjectName
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.ImmutableList;
+ import com.google.common.collect.ImmutableMap;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 +import org.apache.cassandra.utils.CassandraVersion;
  import org.apache.cassandra.utils.Pair;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -112,7 -114,7 +116,8 @@@ public class Gossiper implements IFailu
      private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>();
  
      /* initial seeds for joining the cluster */
--    private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
++    @VisibleForTesting
++    final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
  
      /* map where key is the endpoint and value is the state associated with the endpoint */
      final ConcurrentMap<InetAddress, EndpointState> endpointStateMap = new ConcurrentHashMap<InetAddress, EndpointState>();
@@@ -126,7 -128,8 +131,10 @@@
      private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
  
      private volatile boolean inShadowRound = false;
++    // seeds gathered during shadow round that indicated to be in the shadow round phase as well
 +    private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
+     // endpoint states as gathered during shadow round
+     private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
  
      private volatile long lastProcessedMessageAt = System.currentTimeMillis();
  
@@@ -715,22 -720,16 +725,24 @@@
      }
  
      /**
 -     * Check if this endpoint can safely bootstrap into the cluster.
 +     * Check if this node can safely be started and join the ring.
 +     * If the node is bootstrapping, examines gossip state for any previous status to decide whether
 +     * it's safe to allow this node to start and bootstrap. If not bootstrapping, compares the host ID
 +     * that the node itself has (obtained by reading from system.local or generated if not present)
 +     * with the host ID obtained from gossip for the endpoint address (if any). This latter case
 +     * prevents a non-bootstrapping, new node from being started with the same address of a
 +     * previously started, but currently down predecessor.
       *
       * @param endpoint - the endpoint to check
 +     * @param localHostUUID - the host id to check
 +     * @param isBootstrapping - whether the node intends to bootstrap when joining
+      * @param epStates - endpoint states in the cluster
 -     * @return true if the endpoint can join the cluster
 +     * @return true if it is safe to start the node, false otherwise
       */
-     public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping)
 -    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
++    public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping,
++                                    Map<InetAddress, EndpointState> epStates)
      {
-         EndpointState epState = endpointStateMap.get(endpoint);
+         EndpointState epState = epStates.get(endpoint);
 -
          // if there's no previous state, or the node was previously removed from the cluster, we're good
          if (epState == null || isDeadState(epState))
              return true;
@@@ -1343,20 -1327,27 +1352,32 @@@
      }
  
      /**
-      *  Do a single 'shadow' round of gossip, where we do not modify any state
-      *  Used when preparing to join the ring:
-      *      * when replacing a node, to get and assume its tokens
-      *      * when joining, to check that the local host id matches any previous id for the endpoint address
+      * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+      * map return value, instead of endpointStateMap.
+      *
 -     * Used when preparing to join the ring:
+      * <ul>
+      *     <li>when replacing a node, to get and assume its tokens</li>
+      *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+      * </ul>
+      *
+      * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
 -     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
++     * again by calling {@link Gossiper#maybeFinishShadowRound(InetAddress, boolean, Map)}. This will update
+      * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+      * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+      * the same time.
+      *
+      * @return endpoint states gathered during shadow round or empty map
       */
-     public void doShadowRound()
+     public synchronized Map<InetAddress, EndpointState> doShadowRound()
      {
          buildSeedsList();
 +        // it may be that the local address is the only entry in the seed
 +        // list in which case, attempting a shadow round is pointless
 +        if (seeds.isEmpty())
-             return;
++            return endpointShadowStateMap;
 +
 +        seedsInShadowRound.clear();
+         endpointShadowStateMap.clear();
          // send a completely empty syn
          List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
          GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@@ -1401,9 -1383,11 +1422,12 @@@
          {
              throw new RuntimeException(wtf);
          }
+ 
+         return ImmutableMap.copyOf(endpointShadowStateMap);
      }
  
--    private void buildSeedsList()
++    @VisibleForTesting
++    void buildSeedsList()
      {
          for (InetAddress seed : DatabaseDescriptor.getSeeds())
          {
@@@ -1521,32 -1505,12 +1545,33 @@@
          return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
      }
  
-     protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound)
 -    protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
++    protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
      {
          if (inShadowRound)
          {
 -            endpointShadowStateMap.putAll(epStateMap);
 -            inShadowRound = false;
 +            if (!isInShadowRound)
 +            {
 +                logger.debug("Received a regular ack from {}, can now exit shadow round", respondent);
 +                // respondent sent back a full ack, so we can exit our shadow round
++                endpointShadowStateMap.putAll(epStateMap);
 +                inShadowRound = false;
 +                seedsInShadowRound.clear();
 +            }
 +            else
 +            {
 +                // respondent indicates it too is in a shadow round, if all seeds
 +                // are in this state then we can exit our shadow round. Otherwise,
 +                // we keep retrying the SR until one responds with a full ACK or
 +                // we learn that all seeds are in SR.
 +                logger.debug("Received an ack from {} indicating it is also in shadow round", respondent);
 +                seedsInShadowRound.add(respondent);
 +                if (seedsInShadowRound.containsAll(seeds))
 +                {
 +                    logger.debug("All seeds are in a shadow round, clearing this node to exit its own");
 +                    inShadowRound = false;
 +                    seedsInShadowRound.clear();
 +                }
 +            }
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index b64cf13,6760040..3c0bc1a
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -492,76 -474,52 +492,75 @@@ public class StorageService extends Not
          daemon.deactivate();
      }
  
 -    public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
 +    private synchronized UUID prepareForReplacement() throws ConfigurationException
      {
 -        logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
 +        if (SystemKeyspace.bootstrapComplete())
 +            throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
 +
 +        if (!joinRing)
 +            throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
  
 -        // make magic happen
 +        if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
 +            throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " +
 +                                       "guarantees as the expected data may not be present until repair is run. " +
 +                                       "To perform this operation, please restart with " +
 +                                       "-Dcassandra.allow_unsafe_replace=true");
 +
 +        InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
 +        logger.info("Gathering node replacement information for {}", replaceAddress);
-         Gossiper.instance.doShadowRound();
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -        // now that we've gossiped at least once, we should be able to find the node we're replacing
 -        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
 -            throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
 -        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
 +        // as we've completed the shadow round of gossip, we should be able to find the node we're replacing
-         if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null)
++        if (epStates.get(replaceAddress) == null)
 +            throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
 +
          try
          {
-             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
 -            VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
++            VersionedValue tokensVersionedValue = epStates.get(replaceAddress).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
 -                throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +                throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress));
  
 -            if (isReplacingSameAddress())
 -            {
 -                SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
 -            }
 -            return tokens;
 +            bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
          }
          catch (IOException e)
          {
              throw new RuntimeException(e);
          }
 +
 +        UUID localHostId = SystemKeyspace.getLocalHostId();
 +
 +        if (isReplacingSameAddress())
 +        {
-             localHostId = Gossiper.instance.getHostId(replaceAddress);
++            localHostId = Gossiper.instance.getHostId(replaceAddress, epStates);
 +            SystemKeyspace.setLocalHostId(localHostId); // use the replacee's host Id as our own so we receive hints, etc
 +        }
 +
-         Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
 +        return localHostId;
      }
  
 -    public synchronized void checkForEndpointCollision() throws ConfigurationException
 +    private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException
      {
 +        if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
 +        {
 +            logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true");
 +            return;
 +        }
 +
          logger.debug("Starting shadow gossip round to check for endpoint collision");
-         Gossiper.instance.doShadowRound();
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
+         Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 -        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
 +        // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
 +        // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
 +        // one, which was either read from system.local or generated at startup. If a learned id is present &
 +        // doesn't match the local, then the node needs replacing
-         if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap()))
++        if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap(), epStates))
          {
              throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                       "Use cassandra.replace_address if you want to replace this node.",
                                                       FBUtilities.getBroadcastAddress()));
          }
 -        if (useStrictConsistency && !allowSimultaneousMoves())
 +
 +        if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
          {
-             for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+             for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
              {
                  // ignore local node or empty status
                  if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/conf/cassandra-seeds.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra-seeds.yaml
index 0000000,0000000..02d25d2
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra-seeds.yaml
@@@ -1,0 -1,0 +1,43 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
++#
++cluster_name: Test Cluster
++# memtable_allocation_type: heap_buffers
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++cdc_raw_directory: build/test/cassandra/cdc_raw
++cdc_enabled: false
++hints_directory: build/test/cassandra/hints
++partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++    - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++      parameters:
++          - seeds: "127.0.0.10,127.0.1.10,127.0.2.10"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++server_encryption_options:
++    internode_encryption: none
++    keystore: conf/.keystore
++    keystore_password: cassandra
++    truststore: conf/.truststore
++    truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true
++enable_scripted_user_defined_functions: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index 0000000,0000000..f8cc49c
new file mode 100644
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@@ -1,0 -1,0 +1,116 @@@
++/*
++* Licensed to the Apache Software Foundation (ASF) under one
++* or more contributor license agreements.  See the NOTICE file
++* distributed with this work for additional information
++* regarding copyright ownership.  The ASF licenses this file
++* to you under the Apache License, Version 2.0 (the
++* "License"); you may not use this file except in compliance
++* with the License.  You may obtain a copy of the License at
++*
++*    http://www.apache.org/licenses/LICENSE-2.0
++*
++* Unless required by applicable law or agreed to in writing,
++* software distributed under the License is distributed on an
++* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++* KIND, either express or implied.  See the License for the
++* specific language governing permissions and limitations
++* under the License.
++*/
++
++package org.apache.cassandra.gms;
++
++import java.util.Collections;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.After;
++import org.junit.BeforeClass;
++import org.junit.Test;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.locator.IEndpointSnitch;
++import org.apache.cassandra.locator.PropertyFileSnitch;
++import org.apache.cassandra.net.MessageIn;
++import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.net.MockMessagingService;
++import org.apache.cassandra.net.MockMessagingSpy;
++import org.apache.cassandra.service.StorageService;
++
++import static org.apache.cassandra.net.MockMessagingService.verb;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
++
++public class ShadowRoundTest
++{
++    private static final Logger logger = LoggerFactory.getLogger(ShadowRoundTest.class);
++
++    @BeforeClass
++    public static void setUp() throws ConfigurationException
++    {
++        System.setProperty("cassandra.config", "cassandra-seeds.yaml");
++
++        DatabaseDescriptor.daemonInitialization();
++        IEndpointSnitch snitch = new PropertyFileSnitch();
++        DatabaseDescriptor.setEndpointSnitch(snitch);
++        Keyspace.setInitialized();
++    }
++
++    @After
++    public void cleanup()
++    {
++        MockMessagingService.cleanup();
++    }
++
++    @Test
++    public void testDelayedResponse()
++    {
++        Gossiper.instance.buildSeedsList();
++        int noOfSeeds = Gossiper.instance.seeds.size();
++
++        final AtomicBoolean ackSend = new AtomicBoolean(false);
++        MockMessagingSpy spySyn = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
++                .respondN((msgOut, to) ->
++                {
++                    // ACK once to finish shadow round, then busy-spin until gossiper has been enabled
++                    // and then reply with remaining ACKs from other seeds
++                    if (!ackSend.compareAndSet(false, true))
++                    {
++                        while (!Gossiper.instance.isEnabled()) ;
++                    }
++
++                    HeartBeatState hb = new HeartBeatState(123, 456);
++                    EndpointState state = new EndpointState(hb);
++                    GossipDigestAck payload = new GossipDigestAck(
++                            Collections.singletonList(new GossipDigest(to, hb.getGeneration(), hb.getHeartBeatVersion())),
++                            Collections.singletonMap(to, state));
++
++                    logger.debug("Simulating digest ACK reply");
++                    return MessageIn.create(to, payload, Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK, MessagingService.current_version);
++                }, noOfSeeds);
++
++        // GossipDigestAckVerbHandler will send ack2 for each ack received (after the shadow round)
++        MockMessagingSpy spyAck2 = MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_ACK2)).dontReply();
++
++        // Migration request messages should not be emitted during shadow round
++        MockMessagingSpy spyMigrationReq = MockMessagingService.when(verb(MessagingService.Verb.MIGRATION_REQUEST)).dontReply();
++
++        try
++        {
++            StorageService.instance.initServer();
++        }
++        catch (Exception e)
++        {
++            assertEquals("Unable to contact any seeds!", e.getMessage());
++        }
++
++        // we expect one SYN for each seed during shadow round + additional SYNs after gossiper has been enabled
++        assertTrue(spySyn.messagesIntercepted > noOfSeeds);
++
++        // we don't expect to emit any GOSSIP_DIGEST_ACK2 or MIGRATION_REQUEST messages
++        assertEquals(0, spyAck2.messagesIntercepted);
++        assertEquals(0, spyMigrationReq.messagesIntercepted);
++    }
++}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec9ce3df/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/net/MatcherResponse.java
index 21a75c9,0000000..6cd8085
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@@ -1,208 -1,0 +1,214 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.net.InetAddress;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.concurrent.BlockingQueue;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.BiFunction;
 +import java.util.function.Function;
 +
 +/**
 + * Sends a response for an incoming message with a matching {@link Matcher}.
 + * The actual behavior by any instance of this class can be inspected by
 + * interacting with the returned {@link MockMessagingSpy}.
 + */
 +public class MatcherResponse
 +{
 +    private final Matcher<?> matcher;
 +    private final Set<Integer> sendResponses = new HashSet<>();
 +    private final MockMessagingSpy spy = new MockMessagingSpy();
 +    private final AtomicInteger limitCounter = new AtomicInteger(Integer.MAX_VALUE);
 +    private IMessageSink sink;
 +
 +    MatcherResponse(Matcher<?> matcher)
 +    {
 +        this.matcher = matcher;
 +    }
 +
 +    /**
 +     * Do not create any responses for intercepted outbound messages.
 +     */
 +    public MockMessagingSpy dontReply()
 +    {
 +        return respond((MessageIn<?>)null);
 +    }
 +
 +    /**
 +     * Respond with provided message in reply to each intercepted outbound message.
 +     * @param message   the message to use as mock reply from the cluster
 +     */
 +    public MockMessagingSpy respond(MessageIn<?> message)
 +    {
 +        return respondN(message, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with the provided message in reply to each intercepted outbound message.
 +     * @param response  the message to use as mock reply from the cluster
 +     * @param limit     number of times to respond with message
 +     */
 +    public MockMessagingSpy respondN(final MessageIn<?> response, int limit)
 +    {
 +        return respondN((in, to) -> response, limit);
 +    }
 +
 +    /**
 +     * Respond with the message created by the provided function that will be called with each intercepted outbound message.
 +     * @param fnResponse    function to call for creating reply based on intercepted message and target address
 +     */
 +    public <T, S> MockMessagingSpy respond(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse)
 +    {
 +        return respondN(fnResponse, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond with message wrapping the payload object created by provided function called for each intercepted outbound message.
 +     * The target address from the intercepted message will automatically be used as the created message's sender address.
 +     * @param fnResponse    function to call for creating payload object based on intercepted message and target address
 +     * @param verb          verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb)
 +    {
 +        return respondNWithPayloadForEachReceiver(fnResponse, verb, Integer.MAX_VALUE);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with message wrapping the payload object created by provided function called for
 +     * each intercepted outbound message. The target address from the intercepted message will automatically be used as the
 +     * created message's sender address.
 +     * @param fnResponse    function to call for creating payload object based on intercepted message and target address
 +     * @param verb          verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondNWithPayloadForEachReceiver(Function<MessageOut<T>, S> fnResponse, MessagingService.Verb verb, int limit)
 +    {
 +        return respondN((MessageOut<T> msg, InetAddress to) -> {
 +                    S payload = fnResponse.apply(msg);
 +                    if (payload == null)
 +                        return null;
 +                    else
 +                        return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version);
 +                },
 +                limit);
 +    }
 +
 +    /**
 +     * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
 +     * from the provided queue. No reply will be send when the queue has been exhausted.
 +     * @param cannedResponses   prepared payload messages to use for responses
 +     * @param verb              verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(Queue<S> cannedResponses, MessagingService.Verb verb)
 +    {
 +        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> cannedResponses.poll(), verb);
 +    }
 +
 +    /**
 +     * Responds to each intercepted outbound message by creating a response message wrapping the next element consumed
 +     * from the provided queue. This method will block until queue elements are available.
 +     * @param cannedResponses   prepared payload messages to use for responses
 +     * @param verb              verb to use for reply message
 +     */
 +    public <T, S> MockMessagingSpy respondWithPayloadForEachReceiver(BlockingQueue<S> cannedResponses, MessagingService.Verb verb)
 +    {
 +        return respondWithPayloadForEachReceiver((MessageOut<T> msg) -> {
 +            try
 +            {
 +                return cannedResponses.take();
 +            }
 +            catch (InterruptedException e)
 +            {
 +                return null;
 +            }
 +        }, verb);
 +    }
 +
 +    /**
 +     * Respond a limited number of times with the message created by the provided function that will be called with
 +     * each intercepted outbound message.
 +     * @param fnResponse    function to call for creating reply based on intercepted message and target address
 +     */
 +    public <T, S> MockMessagingSpy respondN(BiFunction<MessageOut<T>, InetAddress, MessageIn<S>> fnResponse, int limit)
 +    {
 +        limitCounter.set(limit);
 +
 +        assert sink == null: "destroy() must be called first to register new response";
 +
 +        sink = new IMessageSink()
 +        {
 +            public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to)
 +            {
 +                // prevent outgoing message from being send in case matcher indicates a match
 +                // and instead send the mocked response
 +                if (matcher.matches(message, to))
 +                {
 +                    spy.matchingMessage(message);
 +
 +                    if (limitCounter.decrementAndGet() < 0)
 +                        return false;
 +
 +                    synchronized (sendResponses)
 +                    {
 +                        // I'm not sure about retry semantics regarding message/ID relationships, but I assume
 +                        // sending a message multiple times using the same ID shouldn't happen..
 +                        assert !sendResponses.contains(id) : "ID re-use for outgoing message";
 +                        sendResponses.add(id);
 +                    }
-                     MessageIn<?> response = fnResponse.apply(message, to);
-                     if (response != null)
++
++                    // create response asynchronously to match request/response communication execution behavior
++                    new Thread(() ->
 +                    {
-                         CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
-                         if (cb != null)
-                             cb.callback.response(response);
-                         else
-                             MessagingService.instance().receive(response, id);
-                         spy.matchingResponse(response);
-                     }
++                        MessageIn<?> response = fnResponse.apply(message, to);
++                        if (response != null)
++                        {
++                            CallbackInfo cb = MessagingService.instance().getRegisteredCallback(id);
++                            if (cb != null)
++                                cb.callback.response(response);
++                            else
++                                MessagingService.instance().receive(response, id);
++                            spy.matchingResponse(response);
++                        }
++                    }).start();
++
 +                    return false;
 +                }
 +                return true;
 +            }
 +
 +            public boolean allowIncomingMessage(MessageIn message, int id)
 +            {
 +                return true;
 +            }
 +        };
 +        MessagingService.instance().addMessageSink(sink);
 +
 +        return spy;
 +    }
 +
 +    /**
 +     * Stops currently registered response from being send.
 +     */
 +    public void destroy()
 +    {
 +        MessagingService.instance().removeMessageSink(sink);
 +    }
 +}


[02/10] cassandra git commit: Discard in-flight shadow round responses

Posted by jk...@apache.org.
Discard in-flight shadow round responses

patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for CASSANDRA-12653


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9

Branch: refs/heads/cassandra-3.0
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <jk...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 +++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
 .../apache/cassandra/service/MigrationTask.java | 12 ++--
 .../cassandra/service/StorageService.java       | 16 +++--
 5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
  * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
  * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
  * Fix queries updating multiple time the same list (CASSANDRA-13130)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
 
-        if (epStateMap.size() > 0)
-        {
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(epStateMap);
-            Gossiper.instance.applyStateLocally(epStateMap);
-        }
-
         if (Gossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
                 logger.debug("Finishing shadow round with {}", from);
-            Gossiper.instance.finishShadowRound();
+            Gossiper.instance.finishShadowRound(epStateMap);
             return; // don't bother doing anything else, we have what we came for
         }
 
+        if (epStateMap.size() > 0)
+        {
+            // Ignore any GossipDigestAck messages that we handle before a regular GossipDigestSyn has been send.
+            // This will prevent Acks from leaking over from the shadow round that are not actual part of
+            // the regular gossip conversation.
+            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || Gossiper.instance.firstSynSendAt == 0)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("Ignoring unrequested GossipDigestAck from {}", from);
+                return;
+            }
+
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(epStateMap);
+            Gossiper.instance.applyStateLocally(epStateMap);
+        }
+
         /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
         Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
         for (GossipDigest gDigest : gDigestList)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private static final Logger logger = LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
+    // Timestamp to prevent processing any in-flight messages for we've not send any SYN yet, see CASSANDRA-12653.
+    volatile long firstSynSendAt = 0L;
+
     public static final long aVeryLongTime = 259200 * 1000; // 3 days
 
     // Maximimum difference between generation value and local time we are willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private volatile boolean inShadowRound = false;
 
+    // endpoint states as gathered during shadow round
+    private final Map<InetAddress, EndpointState> endpointShadowStateMap = new ConcurrentHashMap<>();
+
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
     private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
+        if (firstSynSendAt == 0)
+            firstSynSendAt = System.nanoTime();
         MessagingService.instance().sendOneWay(message, to);
         return seeds.contains(to);
     }
@@ -713,11 +722,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
      * Check if this endpoint can safely bootstrap into the cluster.
      *
      * @param endpoint - the endpoint to check
+     * @param epStates - endpoint states in the cluster
      * @return true if the endpoint can join the cluster
      */
-    public boolean isSafeForBootstrap(InetAddress endpoint)
+    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
     {
-        EndpointState epState = endpointStateMap.get(endpoint);
+        EndpointState epState = epStates.get(endpoint);
 
         // if there's no previous state, or the node was previously removed from the cluster, we're good
         if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    // removes ALL endpoint states; should only be called after shadow gossip
-    public void resetEndpointStateMap()
-    {
-        endpointStateMap.clear();
-        unreachableEndpoints.clear();
-        liveEndpoints.clear();
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public UUID getHostId(InetAddress endpoint)
     {
-        return UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+        return getHostId(endpoint, endpointStateMap);
+    }
+
+    public UUID getHostId(InetAddress endpoint, Map<InetAddress, EndpointState> epStates)
+    {
+        return UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     *  Do a single 'shadow' round of gossip, where we do not modify any state
-     *  Only used when replacing a node, to get and assume its states
+     * Do a single 'shadow' round of gossip by retrieving endpoint states that will be stored exclusively in the
+     * map return value, instead of endpointStateMap.
+     *
+     * Used when preparing to join the ring:
+     * <ul>
+     *     <li>when replacing a node, to get and assume its tokens</li>
+     *     <li>when joining, to check that the local host id matches any previous id for the endpoint address</li>
+     * </ul>
+     *
+     * Method is synchronized, as we use an in-progress flag to indicate that shadow round must be cleared
+     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will update
+     * {@link Gossiper#endpointShadowStateMap} with received values, in order to return an immutable copy to the
+     * caller of {@link Gossiper#doShadowRound()}. Therefor only a single shadow round execution is permitted at
+     * the same time.
+     *
+     * @return endpoint states gathered during shadow round or empty map
      */
-    public void doShadowRound()
+    public synchronized Map<InetAddress, EndpointState> doShadowRound()
     {
         buildSeedsList();
+        // it may be that the local address is the only entry in the seed
+        // list in which case, attempting a shadow round is pointless
+        if (seeds.isEmpty())
+            return endpointShadowStateMap;
+
+        endpointShadowStateMap.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
         GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             throw new RuntimeException(wtf);
         }
+
+        return ImmutableMap.copyOf(endpointShadowStateMap);
     }
 
     private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
-    protected void finishShadowRound()
+    protected void finishShadowRound(Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
+        {
+            endpointShadowStateMap.putAll(epStateMap);
             inShadowRound = false;
+        }
     }
 
     protected boolean isInShadowRound()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
 
     public void runMayThrow() throws Exception
     {
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger.warn("Can't send schema pull request: node {} is down.", endpoint);
+            return;
+        }
+
         // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
         // potentially enough for the endpoint node to restart - which is an issue if it does restart upgraded, with
         // a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
             return;
         }
 
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Can't send schema pull request: node {} is down.", endpoint);
-            return;
-        }
-
         MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);
 
         IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             MessagingService.instance().listen();
 
         // make magic happen
-        Gossiper.instance.doShadowRound();
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
 
         // now that we've gossiped at least once, we should be able to find the node we're replacing
-        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
         try
         {
-            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
             }
-            Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
         catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.debug("Starting shadow gossip round to check for endpoint collision");
         if (!MessagingService.instance().isListening())
             MessagingService.instance().listen();
-        Gossiper.instance.doShadowRound();
-        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+        Map<InetAddress, EndpointState> epStates = Gossiper.instance.doShadowRound();
+        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), epStates))
         {
             throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                      "Use cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         if (useStrictConsistency && !allowSimultaneousMoves())
         {
-            for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
+            for (Map.Entry<InetAddress, EndpointState> entry : epStates.entrySet())
             {
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
             }
         }
-        Gossiper.instance.resetEndpointStateMap();
     }
 
     private boolean allowSimultaneousMoves()