You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/08/31 19:29:09 UTC

[1/6] cassandra git commit: Forward writes to replacement node when replace_address != broadcast_address

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 6eff0829d -> b39d984f7
  refs/heads/cassandra-3.0 ab98b1151 -> e4a53f4d3
  refs/heads/trunk 8a3f0e11f -> 0cd48f76d


Forward writes to replacement node when replace_address != broadcast_address

Patch by Paulo Motta; reviewed by Richard Low for CASSANDRA-8523


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b39d984f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b39d984f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b39d984f

Branch: refs/heads/cassandra-2.2
Commit: b39d984f7bd682c7638415d65dcc4ac9bcb74e5f
Parents: 6eff082
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Jun 17 21:09:31 2016 -0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:21:30 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  54 ++++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 139 +++++++++++++++----
 6 files changed, 177 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f7cf0e..d7e9394 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
  * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
  * Fail repair on non-existing table (CASSANDRA-12279)
  * cqlsh copy: fix missing counter values (CASSANDRA-12476)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 00e3da8..a8f9524 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -76,6 +76,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     static {
         SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
         SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
+        SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
     }
 
     private volatile ScheduledFuture<?> scheduledGossipTask;
@@ -333,10 +334,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         if (epState == null)
             return;
 
-        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
         if (!epState.isAlive())
             return;
 
+        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
+
+
         if (isShutdown(endpoint))
         {
             markAsShutdown(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 3ea7bb4..661d3ba 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -65,6 +65,7 @@ public class VersionedValue implements Comparable<VersionedValue>
 
     // values for ApplicationState.STATUS
     public final static String STATUS_BOOTSTRAPPING = "BOOT";
+    public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE";
     public final static String STATUS_NORMAL = "NORMAL";
     public final static String STATUS_LEAVING = "LEAVING";
     public final static String STATUS_LEFT = "LEFT";
@@ -133,6 +134,11 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(value.value);
         }
 
+        public VersionedValue bootReplacing(InetAddress oldNode)
+        {
+            return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress()));
+        }
+
         public VersionedValue bootstrapping(Collection<Token> tokens)
         {
             return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index de16fda..b06c9c8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -79,6 +80,9 @@ public class TokenMetadata
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
     private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+
+    private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
+
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
     private final Set<InetAddress> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
@@ -185,6 +189,7 @@ public class TokenMetadata
                 tokenToEndpointMap.removeValue(endpoint);
                 topology.addEndpoint(endpoint);
                 leavingEndpoints.remove(endpoint);
+                replacementToOriginal.remove(endpoint);
                 removeFromMoving(endpoint); // also removing this endpoint from moving
 
                 for (Token token : tokens)
@@ -297,13 +302,17 @@ public class TokenMetadata
 
     public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
     {
+        addBootstrapTokens(tokens, endpoint, null);
+    }
+
+    private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
+    {
         assert tokens != null && !tokens.isEmpty();
         assert endpoint != null;
 
         lock.writeLock().lock();
         try
         {
-
             InetAddress oldEndpoint;
 
             for (Token token : tokens)
@@ -313,7 +322,7 @@ public class TokenMetadata
                     throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
 
                 oldEndpoint = tokenToEndpointMap.get(token);
-                if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+                if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original))
                     throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
             }
 
@@ -328,6 +337,43 @@ public class TokenMetadata
         }
     }
 
+    public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+    {
+        assert replacingTokens != null && !replacingTokens.isEmpty();
+        assert newNode != null && oldNode != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+            if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+            {
+                throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+                                                         "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+                                                         replacingTokens));
+            }
+
+            logger.debug("Replacing {} with {}", newNode, oldNode);
+            replacementToOriginal.put(newNode, oldNode);
+
+            addBootstrapTokens(replacingTokens, newNode, oldNode);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+    {
+        return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
+    }
+
+    public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+    {
+        return Optional.fromNullable((replacementToOriginal.get(endpoint)));
+    }
+
     public void removeBootstrapTokens(Collection<Token> tokens)
     {
         assert tokens != null && !tokens.isEmpty();
@@ -391,6 +437,10 @@ public class TokenMetadata
             tokenToEndpointMap.removeValue(endpoint);
             topology.removeEndpoint(endpoint);
             leavingEndpoints.remove(endpoint);
+            if (replacementToOriginal.remove(endpoint) != null)
+            {
+                logger.debug("Node {} failed during replace.", endpoint);
+            }
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
             invalidateCachedRings();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 69fa93d..945dd2f 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.gms.*;
 
 public class LoadBroadcaster implements IEndpointStateChangeSubscriber
 {
-    static final int BROADCAST_INTERVAL = 60 * 1000;
+    static final int BROADCAST_INTERVAL = Integer.getInteger("cassandra.broadcast_interval_ms", 60 * 1000);
 
     public static final LoadBroadcaster instance = new LoadBroadcaster();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 48a291b..9197ab1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -32,6 +32,7 @@ import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
@@ -185,6 +186,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
     private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
     private boolean replacing;
+    private UUID replacingId;
 
     private final StreamStateStore streamStateStore = new StreamStateStore();
 
@@ -194,9 +196,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (logger.isDebugEnabled())
             logger.debug("Setting tokens to {}", tokens);
         SystemKeyspace.updateTokens(tokens);
-        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         Collection<Token> localTokens = getLocalTokens();
         setGossipTokens(localTokens);
+        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         setMode(Mode.NORMAL, false);
     }
 
@@ -431,11 +433,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // make magic happen
         Gossiper.instance.doShadowRound();
 
-        UUID hostId = null;
         // now that we've gossiped at least once, we should be able to find the node we're replacing
         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
         try
         {
             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
@@ -443,7 +444,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 
-            SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+            if (isReplacingSameAddress())
+            {
+                SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+            }
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
@@ -472,7 +476,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
                     continue;
-                String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1);
+                String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS));
                 assert (pieces.length > 0);
                 String state = pieces[0];
                 if (state.equals(VersionedValue.STATUS_BOOTSTRAPPING) || state.equals(VersionedValue.STATUS_LEAVING) || state.equals(VersionedValue.STATUS_MOVING))
@@ -681,8 +685,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (!DatabaseDescriptor.isAutoBootstrap())
                     throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
                 bootstrapTokens = prepareReplacementInfo();
-                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
-                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                if (isReplacingSameAddress())
+                {
+                    logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
+                                "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
+                                "repair must be run after the replacement process in order to make this node consistent.",
+                                DatabaseDescriptor.getReplaceAddress());
+                    appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                }
             }
             else if (shouldBootstrap())
             {
@@ -799,7 +810,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
             else
             {
-                if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
+                if (!isReplacingSameAddress())
                 {
                     try
                     {
@@ -885,17 +896,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             if (dataAvailable)
             {
-                // start participating in the ring.
-                SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
-                setTokens(bootstrapTokens);
+                finishJoiningRing();
+
                 // remove the existing info about the replaced node.
                 if (!current.isEmpty())
                 {
                     for (InetAddress existing : current)
                         Gossiper.instance.replacedEndpoint(existing);
                 }
-                assert tokenMetadata.sortedTokens().size() > 0;
-                doAuthSetup();
             }
             else
             {
@@ -908,6 +916,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public static boolean isReplacingSameAddress()
+    {
+        return DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress());
+    }
+
     public void gossipSnitchInfo()
     {
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -933,16 +946,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         else if (isSurveyMode)
         {
-            setTokens(SystemKeyspace.getSavedTokens());
-            SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
             isSurveyMode = false;
             logger.info("Leaving write survey mode and joining ring at operator request");
-            assert tokenMetadata.sortedTokens().size() > 0;
-
-            doAuthSetup();
+            finishJoiningRing();
         }
     }
 
+    private void finishJoiningRing()
+    {
+        // start participating in the ring.
+        SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+        setTokens(bootstrapTokens);
+
+        assert tokenMetadata.sortedTokens().size() > 0;
+        doAuthSetup();
+    }
+
     private void doAuthSetup()
     {
         maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
@@ -1000,7 +1019,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public boolean isJoined()
     {
-        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress());
+        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode;
     }
 
     public void rebuild(String sourceDc)
@@ -1122,12 +1141,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         isBootstrapMode = true;
         SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-        if (!replacing)
+
+        if (!replacing || !isReplacingSameAddress())
         {
             // if not an existing token then bootstrap
             List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
             states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
-            states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
+            states.add(Pair.create(ApplicationState.STATUS, replacing?
+                                                            valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) :
+                                                            valueFactory.bootstrapping(tokens)));
             Gossiper.instance.addLocalApplicationStates(states);
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
             Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
@@ -1138,6 +1160,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
             SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
         }
+
         if (!Gossiper.instance.seenAnySeed())
             throw new IllegalStateException("Unable to contact any seeds!");
 
@@ -1575,14 +1598,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         if (state == ApplicationState.STATUS)
         {
-            String apStateValue = value.value;
-            String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
+            String[] pieces = splitValue(value);
             assert (pieces.length > 0);
 
             String moveName = pieces[0];
 
             switch (moveName)
             {
+                case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE:
+                    handleStateBootreplacing(endpoint, pieces);
+                    break;
                 case VersionedValue.STATUS_BOOTSTRAPPING:
                     handleStateBootstrap(endpoint);
                     break;
@@ -1656,6 +1681,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    private static String[] splitValue(VersionedValue value)
+    {
+        return value.value.split(VersionedValue.DELIMITER_STR, -1);
+    }
+
     public void updateTopology(InetAddress endpoint)
     {
         if (getTokenMetadata().isMember(endpoint))
@@ -1820,6 +1850,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
     }
 
+
+    private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+    {
+        InetAddress oldNode;
+        try
+        {
+            oldNode = InetAddress.getByName(pieces[1]);
+        }
+        catch (Exception e)
+        {
+            logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+            return;
+        }
+
+        if (FailureDetector.instance.isAlive(oldNode))
+        {
+            throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+        }
+
+        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+        if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+        {
+            throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+                                                     newNode, replacingNode.get(), oldNode));
+        }
+
+        Collection<Token> tokens = getTokensFor(newNode);
+
+        if (logger.isDebugEnabled())
+            logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+
+        tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+        PendingRangeCalculatorService.instance.update();
+
+        tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+    }
+
     /**
      * Handle node move to normal state. That is, node is entering token ring and participating
      * in reads.
@@ -1844,11 +1911,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                          endpoint,
                          Gossiper.instance.getEndpointStateForEndpoint(endpoint));
 
+        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint);
+        if (replacingNode.isPresent())
+        {
+            assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported";
+            logger.info("Node {} will complete replacement of {} for tokens {}", endpoint, replacingNode.get(), tokens);
+            if (FailureDetector.instance.isAlive(replacingNode.get()))
+            {
+                logger.error("Node {} cannot complete replacement of alive node {}.", endpoint, replacingNode.get());
+                return;
+            }
+            endpointsToRemove.add(replacingNode.get());
+        }
+
+        Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint);
+        if (replacementNode.isPresent())
+        {
+            logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get());
+        }
+
         updatePeerInfo(endpoint);
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
         UUID hostId = Gossiper.instance.getHostId(endpoint);
         InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
-        if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+        if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null
+            && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
             logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
         else
         {
@@ -1933,7 +2020,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
         }
         if (!tokensToUpdateInSystemKeyspace.isEmpty())
-            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);;
+            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
 
         if (isMoving || operationMode == Mode.MOVING)
         {
@@ -2058,7 +2145,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 PendingRangeCalculatorService.instance.update();
 
                 // find the endpoint coordinating this removal that we need to notify when we're done
-                String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+                String[] coordinator = splitValue(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR));
                 UUID hostId = UUID.fromString(coordinator[1]);
                 // grab any data we are now responsible for and notify responsible node
                 restoreReplicaCount(endpoint, tokenMetadata.getEndpointForHostId(hostId));


[3/6] cassandra git commit: Forward writes to replacement node when replace_address != broadcast_address

Posted by al...@apache.org.
Forward writes to replacement node when replace_address != broadcast_address

Patch by Paulo Motta; reviewed by Richard Low for CASSANDRA-8523


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b39d984f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b39d984f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b39d984f

Branch: refs/heads/trunk
Commit: b39d984f7bd682c7638415d65dcc4ac9bcb74e5f
Parents: 6eff082
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Jun 17 21:09:31 2016 -0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:21:30 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  54 ++++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 139 +++++++++++++++----
 6 files changed, 177 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f7cf0e..d7e9394 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
  * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
  * Fail repair on non-existing table (CASSANDRA-12279)
  * cqlsh copy: fix missing counter values (CASSANDRA-12476)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 00e3da8..a8f9524 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -76,6 +76,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     static {
         SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
         SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
+        SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
     }
 
     private volatile ScheduledFuture<?> scheduledGossipTask;
@@ -333,10 +334,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         if (epState == null)
             return;
 
-        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
         if (!epState.isAlive())
             return;
 
+        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
+
+
         if (isShutdown(endpoint))
         {
             markAsShutdown(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 3ea7bb4..661d3ba 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -65,6 +65,7 @@ public class VersionedValue implements Comparable<VersionedValue>
 
     // values for ApplicationState.STATUS
     public final static String STATUS_BOOTSTRAPPING = "BOOT";
+    public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE";
     public final static String STATUS_NORMAL = "NORMAL";
     public final static String STATUS_LEAVING = "LEAVING";
     public final static String STATUS_LEFT = "LEFT";
@@ -133,6 +134,11 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(value.value);
         }
 
+        public VersionedValue bootReplacing(InetAddress oldNode)
+        {
+            return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress()));
+        }
+
         public VersionedValue bootstrapping(Collection<Token> tokens)
         {
             return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index de16fda..b06c9c8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -79,6 +80,9 @@ public class TokenMetadata
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
     private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+
+    private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
+
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
     private final Set<InetAddress> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
@@ -185,6 +189,7 @@ public class TokenMetadata
                 tokenToEndpointMap.removeValue(endpoint);
                 topology.addEndpoint(endpoint);
                 leavingEndpoints.remove(endpoint);
+                replacementToOriginal.remove(endpoint);
                 removeFromMoving(endpoint); // also removing this endpoint from moving
 
                 for (Token token : tokens)
@@ -297,13 +302,17 @@ public class TokenMetadata
 
     public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
     {
+        addBootstrapTokens(tokens, endpoint, null);
+    }
+
+    private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
+    {
         assert tokens != null && !tokens.isEmpty();
         assert endpoint != null;
 
         lock.writeLock().lock();
         try
         {
-
             InetAddress oldEndpoint;
 
             for (Token token : tokens)
@@ -313,7 +322,7 @@ public class TokenMetadata
                     throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
 
                 oldEndpoint = tokenToEndpointMap.get(token);
-                if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+                if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original))
                     throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
             }
 
@@ -328,6 +337,43 @@ public class TokenMetadata
         }
     }
 
+    public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+    {
+        assert replacingTokens != null && !replacingTokens.isEmpty();
+        assert newNode != null && oldNode != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+            if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+            {
+                throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+                                                         "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+                                                         replacingTokens));
+            }
+
+            logger.debug("Replacing {} with {}", newNode, oldNode);
+            replacementToOriginal.put(newNode, oldNode);
+
+            addBootstrapTokens(replacingTokens, newNode, oldNode);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+    {
+        return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
+    }
+
+    public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+    {
+        return Optional.fromNullable((replacementToOriginal.get(endpoint)));
+    }
+
     public void removeBootstrapTokens(Collection<Token> tokens)
     {
         assert tokens != null && !tokens.isEmpty();
@@ -391,6 +437,10 @@ public class TokenMetadata
             tokenToEndpointMap.removeValue(endpoint);
             topology.removeEndpoint(endpoint);
             leavingEndpoints.remove(endpoint);
+            if (replacementToOriginal.remove(endpoint) != null)
+            {
+                logger.debug("Node {} failed during replace.", endpoint);
+            }
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
             invalidateCachedRings();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 69fa93d..945dd2f 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.gms.*;
 
 public class LoadBroadcaster implements IEndpointStateChangeSubscriber
 {
-    static final int BROADCAST_INTERVAL = 60 * 1000;
+    static final int BROADCAST_INTERVAL = Integer.getInteger("cassandra.broadcast_interval_ms", 60 * 1000);
 
     public static final LoadBroadcaster instance = new LoadBroadcaster();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 48a291b..9197ab1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -32,6 +32,7 @@ import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
@@ -185,6 +186,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
     private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
     private boolean replacing;
+    private UUID replacingId;
 
     private final StreamStateStore streamStateStore = new StreamStateStore();
 
@@ -194,9 +196,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (logger.isDebugEnabled())
             logger.debug("Setting tokens to {}", tokens);
         SystemKeyspace.updateTokens(tokens);
-        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         Collection<Token> localTokens = getLocalTokens();
         setGossipTokens(localTokens);
+        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         setMode(Mode.NORMAL, false);
     }
 
@@ -431,11 +433,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // make magic happen
         Gossiper.instance.doShadowRound();
 
-        UUID hostId = null;
         // now that we've gossiped at least once, we should be able to find the node we're replacing
         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
         try
         {
             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
@@ -443,7 +444,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 
-            SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+            if (isReplacingSameAddress())
+            {
+                SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+            }
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
@@ -472,7 +476,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
                     continue;
-                String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1);
+                String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS));
                 assert (pieces.length > 0);
                 String state = pieces[0];
                 if (state.equals(VersionedValue.STATUS_BOOTSTRAPPING) || state.equals(VersionedValue.STATUS_LEAVING) || state.equals(VersionedValue.STATUS_MOVING))
@@ -681,8 +685,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (!DatabaseDescriptor.isAutoBootstrap())
                     throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
                 bootstrapTokens = prepareReplacementInfo();
-                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
-                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                if (isReplacingSameAddress())
+                {
+                    logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
+                                "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
+                                "repair must be run after the replacement process in order to make this node consistent.",
+                                DatabaseDescriptor.getReplaceAddress());
+                    appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                }
             }
             else if (shouldBootstrap())
             {
@@ -799,7 +810,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
             else
             {
-                if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
+                if (!isReplacingSameAddress())
                 {
                     try
                     {
@@ -885,17 +896,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             if (dataAvailable)
             {
-                // start participating in the ring.
-                SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
-                setTokens(bootstrapTokens);
+                finishJoiningRing();
+
                 // remove the existing info about the replaced node.
                 if (!current.isEmpty())
                 {
                     for (InetAddress existing : current)
                         Gossiper.instance.replacedEndpoint(existing);
                 }
-                assert tokenMetadata.sortedTokens().size() > 0;
-                doAuthSetup();
             }
             else
             {
@@ -908,6 +916,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public static boolean isReplacingSameAddress()
+    {
+        return DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress());
+    }
+
     public void gossipSnitchInfo()
     {
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -933,16 +946,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         else if (isSurveyMode)
         {
-            setTokens(SystemKeyspace.getSavedTokens());
-            SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
             isSurveyMode = false;
             logger.info("Leaving write survey mode and joining ring at operator request");
-            assert tokenMetadata.sortedTokens().size() > 0;
-
-            doAuthSetup();
+            finishJoiningRing();
         }
     }
 
+    private void finishJoiningRing()
+    {
+        // start participating in the ring.
+        SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+        setTokens(bootstrapTokens);
+
+        assert tokenMetadata.sortedTokens().size() > 0;
+        doAuthSetup();
+    }
+
     private void doAuthSetup()
     {
         maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
@@ -1000,7 +1019,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public boolean isJoined()
     {
-        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress());
+        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode;
     }
 
     public void rebuild(String sourceDc)
@@ -1122,12 +1141,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         isBootstrapMode = true;
         SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-        if (!replacing)
+
+        if (!replacing || !isReplacingSameAddress())
         {
             // if not an existing token then bootstrap
             List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
             states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
-            states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
+            states.add(Pair.create(ApplicationState.STATUS, replacing?
+                                                            valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) :
+                                                            valueFactory.bootstrapping(tokens)));
             Gossiper.instance.addLocalApplicationStates(states);
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
             Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
@@ -1138,6 +1160,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
             SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
         }
+
         if (!Gossiper.instance.seenAnySeed())
             throw new IllegalStateException("Unable to contact any seeds!");
 
@@ -1575,14 +1598,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         if (state == ApplicationState.STATUS)
         {
-            String apStateValue = value.value;
-            String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
+            String[] pieces = splitValue(value);
             assert (pieces.length > 0);
 
             String moveName = pieces[0];
 
             switch (moveName)
             {
+                case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE:
+                    handleStateBootreplacing(endpoint, pieces);
+                    break;
                 case VersionedValue.STATUS_BOOTSTRAPPING:
                     handleStateBootstrap(endpoint);
                     break;
@@ -1656,6 +1681,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    private static String[] splitValue(VersionedValue value)
+    {
+        return value.value.split(VersionedValue.DELIMITER_STR, -1);
+    }
+
     public void updateTopology(InetAddress endpoint)
     {
         if (getTokenMetadata().isMember(endpoint))
@@ -1820,6 +1850,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
     }
 
+
+    private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+    {
+        InetAddress oldNode;
+        try
+        {
+            oldNode = InetAddress.getByName(pieces[1]);
+        }
+        catch (Exception e)
+        {
+            logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+            return;
+        }
+
+        if (FailureDetector.instance.isAlive(oldNode))
+        {
+            throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+        }
+
+        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+        if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+        {
+            throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+                                                     newNode, replacingNode.get(), oldNode));
+        }
+
+        Collection<Token> tokens = getTokensFor(newNode);
+
+        if (logger.isDebugEnabled())
+            logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+
+        tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+        PendingRangeCalculatorService.instance.update();
+
+        tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+    }
+
     /**
      * Handle node move to normal state. That is, node is entering token ring and participating
      * in reads.
@@ -1844,11 +1911,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                          endpoint,
                          Gossiper.instance.getEndpointStateForEndpoint(endpoint));
 
+        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint);
+        if (replacingNode.isPresent())
+        {
+            assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported";
+            logger.info("Node {} will complete replacement of {} for tokens {}", endpoint, replacingNode.get(), tokens);
+            if (FailureDetector.instance.isAlive(replacingNode.get()))
+            {
+                logger.error("Node {} cannot complete replacement of alive node {}.", endpoint, replacingNode.get());
+                return;
+            }
+            endpointsToRemove.add(replacingNode.get());
+        }
+
+        Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint);
+        if (replacementNode.isPresent())
+        {
+            logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get());
+        }
+
         updatePeerInfo(endpoint);
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
         UUID hostId = Gossiper.instance.getHostId(endpoint);
         InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
-        if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+        if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null
+            && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
             logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
         else
         {
@@ -1933,7 +2020,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
         }
         if (!tokensToUpdateInSystemKeyspace.isEmpty())
-            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);;
+            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
 
         if (isMoving || operationMode == Mode.MOVING)
         {
@@ -2058,7 +2145,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 PendingRangeCalculatorService.instance.update();
 
                 // find the endpoint coordinating this removal that we need to notify when we're done
-                String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+                String[] coordinator = splitValue(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR));
                 UUID hostId = UUID.fromString(coordinator[1]);
                 // grab any data we are now responsible for and notify responsible node
                 restoreReplicaCount(endpoint, tokenMetadata.getEndpointForHostId(hostId));


[6/6] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0cd48f76
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0cd48f76
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0cd48f76

Branch: refs/heads/trunk
Commit: 0cd48f76d8744d9bdabc65b6218bb82ff9014cb3
Parents: 8a3f0e1 e4a53f4
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 31 20:26:39 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:27:53 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  52 +++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 182 ++++++++++++++-----
 6 files changed, 197 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cd48f76/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a0f6055,30931d3..0edfc76
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -64,50 -9,12 +64,51 @@@ Merged from 3.0
   * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
   * Calculate last compacted key on startup (CASSANDRA-6216)
   * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 +Merged from 2.2:
++ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
 + * Fail repair on non-existing table (CASSANDRA-12279)
 + * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
 +
 +
 +3.8, 3.9
 + * Fix value skipping with counter columns (CASSANDRA-11726)
 + * Fix nodetool tablestats miss SSTable count (CASSANDRA-12205)
 + * Fixed flacky SSTablesIteratedTest (CASSANDRA-12282)
 + * Fixed flacky SSTableRewriterTest: check file counts before calling validateCFS (CASSANDRA-12348)
 + * cqlsh: Fix handling of $$-escaped strings (CASSANDRA-12189)
 + * Fix SSL JMX requiring truststore containing server cert (CASSANDRA-12109)
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
   * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
   * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 - * Backport CASSANDRA-12002 (CASSANDRA-12177)
   * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 - * Fix potential bad messaging service message for paged range reads
 -   within mixed-version 3.x clusters (CASSANDRA-12249)
   * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
   * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
   * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cd48f76/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 13cc8ab,f575a34..18de598
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -73,10 -73,10 +73,11 @@@ public class Gossiper implements IFailu
      static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
                                                            VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
      static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
 -    static {
 +    static
 +    {
          SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
          SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
+         SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
      }
  
      private volatile ScheduledFuture<?> scheduledGossipTask;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cd48f76/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0cd48f76/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 93205fb,c06bed2..2799db2
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -445,49 -436,46 +445,67 @@@ public class StorageService extends Not
          daemon.deactivate();
      }
  
-     private synchronized UUID prepareReplacementInfo(InetAddress replaceAddress) throws ConfigurationException
 -    public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
++    private synchronized UUID prepareForReplacement() throws ConfigurationException
      {
--        logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
++        if (SystemKeyspace.bootstrapComplete())
++            throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
+ 
 -        // make magic happen
++        if (!(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
++            throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
++
++        if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
++            throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " +
++                                       "guarantees as the expected data may not be present until repair is run. " +
++                                       "To perform this operation, please restart with " +
++                                       "-Dcassandra.allow_unsafe_replace=true");
++
++        InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
++        logger.info("Gathering node replacement information for {}", replaceAddress);
          Gossiper.instance.doShadowRound();
 +        // as we've completed the shadow round of gossip, we should be able to find the node we're replacing
 +        if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null)
 +            throw new RuntimeException(String.format("Cannot replace_address %s because it doesn't exist in gossip", replaceAddress));
  
 -        // now that we've gossiped at least once, we should be able to find the node we're replacing
 -        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
 -            throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
 -        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
          try
          {
 -            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
 +            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
 -                throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +                throw new RuntimeException(String.format("Could not find tokens for %s to replace", replaceAddress));
  
 -            if (isReplacingSameAddress())
 -            {
 -                SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
 -            }
 -            Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
 -            return tokens;
 +            bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
          }
          catch (IOException e)
          {
              throw new RuntimeException(e);
          }
 +
-         // we'll use the replacee's host Id as our own so we receive hints, etc
-         UUID localHostId = Gossiper.instance.getHostId(replaceAddress);
-         SystemKeyspace.setLocalHostId(localHostId);
++        UUID localHostId = SystemKeyspace.getLocalHostId();
++
++        if (isReplacingSameAddress())
++        {
++            localHostId = Gossiper.instance.getHostId(replaceAddress);
++            SystemKeyspace.setLocalHostId(localHostId); // use the replacee's host Id as our own so we receive hints, etc
++        }
++
 +        Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
 +        return localHostId;
      }
  
 -    public synchronized void checkForEndpointCollision() throws ConfigurationException
 +    private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException
      {
 +        if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
 +        {
 +            logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true");
 +            return;
 +        }
 +
          logger.debug("Starting shadow gossip round to check for endpoint collision");
 -        if (!MessagingService.instance().isListening())
 -            MessagingService.instance().listen();
          Gossiper.instance.doShadowRound();
 -        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
 +        // If bootstrapping, check whether any previously known status for the endpoint makes it unsafe to do so.
 +        // If not bootstrapping, compare the host id for this endpoint learned from gossip (if any) with the local
 +        // one, which was either read from system.local or generated at startup. If a learned id is present &
 +        // doesn't match the local, then the node needs replacing
 +        if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId, shouldBootstrap()))
          {
              throw new RuntimeException(String.format("A node with address %s already exists, cancelling join. " +
                                                       "Use cassandra.replace_address if you want to replace this node.",
@@@ -748,45 -717,30 +766,39 @@@
                  else
                      throw new ConfigurationException("This node was decommissioned and will not rejoin the ring unless cassandra.override_decommission=true has been set, or all existing data is removed and the node is bootstrapped again");
              }
 -            if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
 -                throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
 +
              if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
                  throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
 +
 +            if (!MessagingService.instance().isListening())
 +                MessagingService.instance().listen();
 +
 +            UUID localHostId = SystemKeyspace.getLocalHostId();
 +
              if (replacing)
              {
--                if (SystemKeyspace.bootstrapComplete())
--                    throw new RuntimeException("Cannot replace address with a node that is already bootstrapped");
- 
-                 if (!(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
-                     throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node");
- 
-                 if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
-                     throw new RuntimeException("Replacing a node without bootstrapping risks invalidating consistency " +
-                                                "guarantees as the expected data may not be present until repair is run. " +
-                                                "To perform this operation, please restart with " +
-                                                "-Dcassandra.allow_unsafe_replace=true");
- 
-                 InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
-                 localHostId = prepareReplacementInfo(replaceAddress);
++                localHostId = prepareForReplacement();
 +                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
 +
-                 // if want to bootstrap the ranges of the node we're replacing,
-                 // go into hibernate mode while that happens. Otherwise, persist
-                 // the tokens we're taking over locally so that they don't get
-                 // clobbered with auto generated ones in joinTokenRing
-                 if (DatabaseDescriptor.isAutoBootstrap())
-                     appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
-                 else
+                 if (!DatabaseDescriptor.isAutoBootstrap())
 -                    throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
 -                bootstrapTokens = prepareReplacementInfo();
 -                if (isReplacingSameAddress())
+                 {
++                    // Will not do replace procedure, persist the tokens we're taking over locally
++                    // so that they don't get clobbered with auto generated ones in joinTokenRing
 +                    SystemKeyspace.updateTokens(bootstrapTokens);
++                }
++                else if (isReplacingSameAddress())
++                {
++                    //only go into hibernate state if replacing the same address (CASSANDRA-8523)
+                     logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
+                                 "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
+                                 "repair must be run after the replacement process in order to make this node consistent.",
+                                 DatabaseDescriptor.getReplaceAddress());
 -                    appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+                     appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                 }
              }
 -            else if (shouldBootstrap())
 +            else
              {
 -                checkForEndpointCollision();
 +                checkForEndpointCollision(localHostId);
              }
  
              // have to start the gossip service before we can see any info on other nodes.  this is necessary
@@@ -1036,13 -990,9 +1050,9 @@@
          }
          else if (isSurveyMode)
          {
-             setTokens(SystemKeyspace.getSavedTokens());
-             SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
--            isSurveyMode = false;
              logger.info("Leaving write survey mode and joining ring at operator request");
-             assert tokenMetadata.sortedTokens().size() > 0;
- 
-             doAuthSetup();
+             finishJoiningRing();
++            isSurveyMode = false;
          }
      }
  


[2/6] cassandra git commit: Forward writes to replacement node when replace_address != broadcast_address

Posted by al...@apache.org.
Forward writes to replacement node when replace_address != broadcast_address

Patch by Paulo Motta; reviewed by Richard Low for CASSANDRA-8523


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b39d984f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b39d984f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b39d984f

Branch: refs/heads/cassandra-3.0
Commit: b39d984f7bd682c7638415d65dcc4ac9bcb74e5f
Parents: 6eff082
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Jun 17 21:09:31 2016 -0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:21:30 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  54 ++++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 139 +++++++++++++++----
 6 files changed, 177 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f7cf0e..d7e9394 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
  * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
  * Fail repair on non-existing table (CASSANDRA-12279)
  * cqlsh copy: fix missing counter values (CASSANDRA-12476)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 00e3da8..a8f9524 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -76,6 +76,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     static {
         SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
         SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
+        SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
     }
 
     private volatile ScheduledFuture<?> scheduledGossipTask;
@@ -333,10 +334,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         if (epState == null)
             return;
 
-        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
         if (!epState.isAlive())
             return;
 
+        logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
+
+
         if (isShutdown(endpoint))
         {
             markAsShutdown(endpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 3ea7bb4..661d3ba 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -65,6 +65,7 @@ public class VersionedValue implements Comparable<VersionedValue>
 
     // values for ApplicationState.STATUS
     public final static String STATUS_BOOTSTRAPPING = "BOOT";
+    public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE";
     public final static String STATUS_NORMAL = "NORMAL";
     public final static String STATUS_LEAVING = "LEAVING";
     public final static String STATUS_LEFT = "LEFT";
@@ -133,6 +134,11 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(value.value);
         }
 
+        public VersionedValue bootReplacing(InetAddress oldNode)
+        {
+            return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress()));
+        }
+
         public VersionedValue bootstrapping(Collection<Token> tokens)
         {
             return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index de16fda..b06c9c8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -79,6 +80,9 @@ public class TokenMetadata
     // means we can detect and reject the addition of multiple nodes at the same token
     // before one becomes part of the ring.
     private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+
+    private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
+
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
     private final Set<InetAddress> leavingEndpoints = new HashSet<>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
@@ -185,6 +189,7 @@ public class TokenMetadata
                 tokenToEndpointMap.removeValue(endpoint);
                 topology.addEndpoint(endpoint);
                 leavingEndpoints.remove(endpoint);
+                replacementToOriginal.remove(endpoint);
                 removeFromMoving(endpoint); // also removing this endpoint from moving
 
                 for (Token token : tokens)
@@ -297,13 +302,17 @@ public class TokenMetadata
 
     public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
     {
+        addBootstrapTokens(tokens, endpoint, null);
+    }
+
+    private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
+    {
         assert tokens != null && !tokens.isEmpty();
         assert endpoint != null;
 
         lock.writeLock().lock();
         try
         {
-
             InetAddress oldEndpoint;
 
             for (Token token : tokens)
@@ -313,7 +322,7 @@ public class TokenMetadata
                     throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
 
                 oldEndpoint = tokenToEndpointMap.get(token);
-                if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+                if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original))
                     throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
             }
 
@@ -328,6 +337,43 @@ public class TokenMetadata
         }
     }
 
+    public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+    {
+        assert replacingTokens != null && !replacingTokens.isEmpty();
+        assert newNode != null && oldNode != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+            if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+            {
+                throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+                                                         "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+                                                         replacingTokens));
+            }
+
+            logger.debug("Replacing {} with {}", newNode, oldNode);
+            replacementToOriginal.put(newNode, oldNode);
+
+            addBootstrapTokens(replacingTokens, newNode, oldNode);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+    {
+        return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
+    }
+
+    public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+    {
+        return Optional.fromNullable((replacementToOriginal.get(endpoint)));
+    }
+
     public void removeBootstrapTokens(Collection<Token> tokens)
     {
         assert tokens != null && !tokens.isEmpty();
@@ -391,6 +437,10 @@ public class TokenMetadata
             tokenToEndpointMap.removeValue(endpoint);
             topology.removeEndpoint(endpoint);
             leavingEndpoints.remove(endpoint);
+            if (replacementToOriginal.remove(endpoint) != null)
+            {
+                logger.debug("Node {} failed during replace.", endpoint);
+            }
             endpointToHostIdMap.remove(endpoint);
             sortedTokens = sortTokens();
             invalidateCachedRings();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 69fa93d..945dd2f 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.gms.*;
 
 public class LoadBroadcaster implements IEndpointStateChangeSubscriber
 {
-    static final int BROADCAST_INTERVAL = 60 * 1000;
+    static final int BROADCAST_INTERVAL = Integer.getInteger("cassandra.broadcast_interval_ms", 60 * 1000);
 
     public static final LoadBroadcaster instance = new LoadBroadcaster();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 48a291b..9197ab1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -32,6 +32,7 @@ import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
@@ -185,6 +186,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
     private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
     private boolean replacing;
+    private UUID replacingId;
 
     private final StreamStateStore streamStateStore = new StreamStateStore();
 
@@ -194,9 +196,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (logger.isDebugEnabled())
             logger.debug("Setting tokens to {}", tokens);
         SystemKeyspace.updateTokens(tokens);
-        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         Collection<Token> localTokens = getLocalTokens();
         setGossipTokens(localTokens);
+        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         setMode(Mode.NORMAL, false);
     }
 
@@ -431,11 +433,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         // make magic happen
         Gossiper.instance.doShadowRound();
 
-        UUID hostId = null;
         // now that we've gossiped at least once, we should be able to find the node we're replacing
         if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
         try
         {
             VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
@@ -443,7 +444,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 
-            SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+            if (isReplacingSameAddress())
+            {
+                SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+            }
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
             return tokens;
         }
@@ -472,7 +476,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
                     continue;
-                String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1);
+                String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS));
                 assert (pieces.length > 0);
                 String state = pieces[0];
                 if (state.equals(VersionedValue.STATUS_BOOTSTRAPPING) || state.equals(VersionedValue.STATUS_LEAVING) || state.equals(VersionedValue.STATUS_MOVING))
@@ -681,8 +685,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 if (!DatabaseDescriptor.isAutoBootstrap())
                     throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
                 bootstrapTokens = prepareReplacementInfo();
-                appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
-                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                if (isReplacingSameAddress())
+                {
+                    logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
+                                "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
+                                "repair must be run after the replacement process in order to make this node consistent.",
+                                DatabaseDescriptor.getReplaceAddress());
+                    appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                }
             }
             else if (shouldBootstrap())
             {
@@ -799,7 +810,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
             else
             {
-                if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
+                if (!isReplacingSameAddress())
                 {
                     try
                     {
@@ -885,17 +896,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             if (dataAvailable)
             {
-                // start participating in the ring.
-                SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
-                setTokens(bootstrapTokens);
+                finishJoiningRing();
+
                 // remove the existing info about the replaced node.
                 if (!current.isEmpty())
                 {
                     for (InetAddress existing : current)
                         Gossiper.instance.replacedEndpoint(existing);
                 }
-                assert tokenMetadata.sortedTokens().size() > 0;
-                doAuthSetup();
             }
             else
             {
@@ -908,6 +916,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public static boolean isReplacingSameAddress()
+    {
+        return DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress());
+    }
+
     public void gossipSnitchInfo()
     {
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -933,16 +946,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         else if (isSurveyMode)
         {
-            setTokens(SystemKeyspace.getSavedTokens());
-            SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
             isSurveyMode = false;
             logger.info("Leaving write survey mode and joining ring at operator request");
-            assert tokenMetadata.sortedTokens().size() > 0;
-
-            doAuthSetup();
+            finishJoiningRing();
         }
     }
 
+    private void finishJoiningRing()
+    {
+        // start participating in the ring.
+        SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+        setTokens(bootstrapTokens);
+
+        assert tokenMetadata.sortedTokens().size() > 0;
+        doAuthSetup();
+    }
+
     private void doAuthSetup()
     {
         maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
@@ -1000,7 +1019,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public boolean isJoined()
     {
-        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress());
+        return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode;
     }
 
     public void rebuild(String sourceDc)
@@ -1122,12 +1141,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         isBootstrapMode = true;
         SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-        if (!replacing)
+
+        if (!replacing || !isReplacingSameAddress())
         {
             // if not an existing token then bootstrap
             List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
             states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
-            states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
+            states.add(Pair.create(ApplicationState.STATUS, replacing?
+                                                            valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) :
+                                                            valueFactory.bootstrapping(tokens)));
             Gossiper.instance.addLocalApplicationStates(states);
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
             Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
@@ -1138,6 +1160,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
             SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
         }
+
         if (!Gossiper.instance.seenAnySeed())
             throw new IllegalStateException("Unable to contact any seeds!");
 
@@ -1575,14 +1598,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         if (state == ApplicationState.STATUS)
         {
-            String apStateValue = value.value;
-            String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
+            String[] pieces = splitValue(value);
             assert (pieces.length > 0);
 
             String moveName = pieces[0];
 
             switch (moveName)
             {
+                case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE:
+                    handleStateBootreplacing(endpoint, pieces);
+                    break;
                 case VersionedValue.STATUS_BOOTSTRAPPING:
                     handleStateBootstrap(endpoint);
                     break;
@@ -1656,6 +1681,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    private static String[] splitValue(VersionedValue value)
+    {
+        return value.value.split(VersionedValue.DELIMITER_STR, -1);
+    }
+
     public void updateTopology(InetAddress endpoint)
     {
         if (getTokenMetadata().isMember(endpoint))
@@ -1820,6 +1850,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
     }
 
+
+    private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+    {
+        InetAddress oldNode;
+        try
+        {
+            oldNode = InetAddress.getByName(pieces[1]);
+        }
+        catch (Exception e)
+        {
+            logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+            return;
+        }
+
+        if (FailureDetector.instance.isAlive(oldNode))
+        {
+            throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+        }
+
+        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+        if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+        {
+            throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+                                                     newNode, replacingNode.get(), oldNode));
+        }
+
+        Collection<Token> tokens = getTokensFor(newNode);
+
+        if (logger.isDebugEnabled())
+            logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+
+        tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+        PendingRangeCalculatorService.instance.update();
+
+        tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+    }
+
     /**
      * Handle node move to normal state. That is, node is entering token ring and participating
      * in reads.
@@ -1844,11 +1911,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                          endpoint,
                          Gossiper.instance.getEndpointStateForEndpoint(endpoint));
 
+        Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint);
+        if (replacingNode.isPresent())
+        {
+            assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported";
+            logger.info("Node {} will complete replacement of {} for tokens {}", endpoint, replacingNode.get(), tokens);
+            if (FailureDetector.instance.isAlive(replacingNode.get()))
+            {
+                logger.error("Node {} cannot complete replacement of alive node {}.", endpoint, replacingNode.get());
+                return;
+            }
+            endpointsToRemove.add(replacingNode.get());
+        }
+
+        Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint);
+        if (replacementNode.isPresent())
+        {
+            logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get());
+        }
+
         updatePeerInfo(endpoint);
         // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
         UUID hostId = Gossiper.instance.getHostId(endpoint);
         InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
-        if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+        if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null
+            && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
             logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
         else
         {
@@ -1933,7 +2020,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
         }
         if (!tokensToUpdateInSystemKeyspace.isEmpty())
-            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);;
+            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
 
         if (isMoving || operationMode == Mode.MOVING)
         {
@@ -2058,7 +2145,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 PendingRangeCalculatorService.instance.update();
 
                 // find the endpoint coordinating this removal that we need to notify when we're done
-                String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+                String[] coordinator = splitValue(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR));
                 UUID hostId = UUID.fromString(coordinator[1]);
                 // grab any data we are now responsible for and notify responsible node
                 restoreReplicaCount(endpoint, tokenMetadata.getEndpointForHostId(hostId));


[5/6] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4a53f4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4a53f4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4a53f4d

Branch: refs/heads/cassandra-3.0
Commit: e4a53f4d3160833af3ea7917a35e7e35ae02786d
Parents: ab98b11 b39d984
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 31 20:24:03 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:25:24 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  52 ++++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 136 +++++++++++++++----
 6 files changed, 173 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4b77e4d,d7e9394..30931d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,5 +1,51 @@@
 -2.2.8
 +3.0.9
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
   * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
   * Fail repair on non-existing table (CASSANDRA-12279)
   * cqlsh copy: fix missing counter values (CASSANDRA-12476)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 97c5f10,b06c9c8..b50db00
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -343,6 -337,43 +352,43 @@@ public class TokenMetadat
          }
      }
  
+     public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+     {
+         assert replacingTokens != null && !replacingTokens.isEmpty();
+         assert newNode != null && oldNode != null;
+ 
+         lock.writeLock().lock();
+         try
+         {
+             Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+             if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+             {
+                 throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+                                                          "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+                                                          replacingTokens));
+             }
+ 
+             logger.debug("Replacing {} with {}", newNode, oldNode);
+             replacementToOriginal.put(newNode, oldNode);
+ 
+             addBootstrapTokens(replacingTokens, newNode, oldNode);
+         }
+         finally
+         {
+             lock.writeLock().unlock();
+         }
+     }
+ 
+     public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+     {
 -        return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
++        return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+     }
+ 
+     public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+     {
 -        return Optional.fromNullable((replacementToOriginal.get(endpoint)));
++        return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+     }
+ 
      public void removeBootstrapTokens(Collection<Token> tokens)
      {
          assert tokens != null && !tokens.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 60cb86b,9197ab1..c06bed2
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -454,9 -442,12 +454,12 @@@ public class StorageService extends Not
              VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
-             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+             if (isReplacingSameAddress())
+             {
+                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+             }
              Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
              return tokens;
          }
@@@ -988,9 -952,19 +996,19 @@@
          }
      }
  
+     private void finishJoiningRing()
+     {
+         // start participating in the ring.
+         SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+         setTokens(bootstrapTokens);
+ 
+         assert tokenMetadata.sortedTokens().size() > 0;
+         doAuthSetup();
+     }
+ 
      private void doAuthSetup()
      {
 -        maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
 +        maybeAddOrUpdateKeyspace(AuthKeyspace.metadata());
  
          DatabaseDescriptor.getRoleManager().setup();
          DatabaseDescriptor.getAuthenticator().setup();
@@@ -1709,18 -1681,11 +1732,23 @@@
          }
      }
  
+     private static String[] splitValue(VersionedValue value)
+     {
+         return value.value.split(VersionedValue.DELIMITER_STR, -1);
+     }
+ 
 +    private void updateNetVersion(InetAddress endpoint, VersionedValue value)
 +    {
 +        try
 +        {
 +            MessagingService.instance().setVersion(endpoint, Integer.valueOf(value.value));
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new AssertionError("Got invalid value for NET_VERSION application state: " + value.value);
 +        }
 +    }
 +
      public void updateTopology(InetAddress endpoint)
      {
          if (getTokenMetadata().isMember(endpoint))
@@@ -1885,6 -1850,43 +1913,42 @@@
          tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
      }
  
 -
+     private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+     {
+         InetAddress oldNode;
+         try
+         {
+             oldNode = InetAddress.getByName(pieces[1]);
+         }
+         catch (Exception e)
+         {
+             logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+             return;
+         }
+ 
+         if (FailureDetector.instance.isAlive(oldNode))
+         {
+             throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+         }
+ 
+         Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+         if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+         {
+             throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+                                                      newNode, replacingNode.get(), oldNode));
+         }
+ 
+         Collection<Token> tokens = getTokensFor(newNode);
+ 
+         if (logger.isDebugEnabled())
+             logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+ 
+         tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+         PendingRangeCalculatorService.instance.update();
+ 
+         tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+     }
+ 
      /**
       * Handle node move to normal state. That is, node is entering token ring and participating
       * in reads.


[4/6] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4a53f4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4a53f4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4a53f4d

Branch: refs/heads/trunk
Commit: e4a53f4d3160833af3ea7917a35e7e35ae02786d
Parents: ab98b11 b39d984
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Aug 31 20:24:03 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:25:24 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java |   5 +-
 .../apache/cassandra/gms/VersionedValue.java    |   6 +
 .../apache/cassandra/locator/TokenMetadata.java |  52 ++++++-
 .../cassandra/service/LoadBroadcaster.java      |   2 +-
 .../cassandra/service/StorageService.java       | 136 +++++++++++++++----
 6 files changed, 173 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4b77e4d,d7e9394..30931d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,50 -1,5 +1,51 @@@
 -2.2.8
 +3.0.9
 + * Add option to state current gc_grace_seconds to tools/bin/sstablemetadata (CASSANDRA-12208)
 + * Fix file system race condition that may cause LogAwareFileLister to fail to classify files (CASSANDRA-11889)
 + * Fix file handle leaks due to simultaneous compaction/repair and
 +   listing snapshots, calculating snapshot sizes, or making schema
 +   changes (CASSANDRA-11594)
 + * Fix nodetool repair exits with 0 for some errors (CASSANDRA-12508)
 + * Do not shut down BatchlogManager twice during drain (CASSANDRA-12504)
 + * Disk failure policy should not be invoked on out of space (CASSANDRA-12385)
 + * Calculate last compacted key on startup (CASSANDRA-6216)
 + * Add schema to snapshot manifest, add USING TIMESTAMP clause to ALTER TABLE statements (CASSANDRA-7190)
 + * Fix clean interval not sent to commit log for empty memtable flush (CASSANDRA-12436)
 + * Fix potential resource leak in RMIServerSocketFactoryImpl (CASSANDRA-12331)
 + * Backport CASSANDRA-12002 (CASSANDRA-12177)
 + * Make sure compaction stats are updated when compaction is interrupted (CASSANDRA-12100)
 + * Fix potential bad messaging service message for paged range reads
 +   within mixed-version 3.x clusters (CASSANDRA-12249)
 + * Change commitlog and sstables to track dirty and clean intervals (CASSANDRA-11828)
 + * NullPointerException during compaction on table with static columns (CASSANDRA-12336)
 + * Fixed ConcurrentModificationException when reading metrics in GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness (CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once (CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense (CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables (CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
   * Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
   * Fail repair on non-existing table (CASSANDRA-12279)
   * cqlsh copy: fix missing counter values (CASSANDRA-12476)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 97c5f10,b06c9c8..b50db00
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -343,6 -337,43 +352,43 @@@ public class TokenMetadat
          }
      }
  
+     public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+     {
+         assert replacingTokens != null && !replacingTokens.isEmpty();
+         assert newNode != null && oldNode != null;
+ 
+         lock.writeLock().lock();
+         try
+         {
+             Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+             if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+             {
+                 throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+                                                          "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+                                                          replacingTokens));
+             }
+ 
+             logger.debug("Replacing {} with {}", newNode, oldNode);
+             replacementToOriginal.put(newNode, oldNode);
+ 
+             addBootstrapTokens(replacingTokens, newNode, oldNode);
+         }
+         finally
+         {
+             lock.writeLock().unlock();
+         }
+     }
+ 
+     public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+     {
 -        return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
++        return Optional.ofNullable(replacementToOriginal.inverse().get(endpoint));
+     }
+ 
+     public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+     {
 -        return Optional.fromNullable((replacementToOriginal.get(endpoint)));
++        return Optional.ofNullable((replacementToOriginal.get(endpoint)));
+     }
+ 
      public void removeBootstrapTokens(Collection<Token> tokens)
      {
          assert tokens != null && !tokens.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4a53f4d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 60cb86b,9197ab1..c06bed2
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -454,9 -442,12 +454,12 @@@ public class StorageService extends Not
              VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
              if (tokensVersionedValue == null)
                  throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
 -            Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
 +            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
  
-             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+             if (isReplacingSameAddress())
+             {
+                 SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+             }
              Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
              return tokens;
          }
@@@ -988,9 -952,19 +996,19 @@@
          }
      }
  
+     private void finishJoiningRing()
+     {
+         // start participating in the ring.
+         SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+         setTokens(bootstrapTokens);
+ 
+         assert tokenMetadata.sortedTokens().size() > 0;
+         doAuthSetup();
+     }
+ 
      private void doAuthSetup()
      {
 -        maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
 +        maybeAddOrUpdateKeyspace(AuthKeyspace.metadata());
  
          DatabaseDescriptor.getRoleManager().setup();
          DatabaseDescriptor.getAuthenticator().setup();
@@@ -1709,18 -1681,11 +1732,23 @@@
          }
      }
  
+     private static String[] splitValue(VersionedValue value)
+     {
+         return value.value.split(VersionedValue.DELIMITER_STR, -1);
+     }
+ 
 +    private void updateNetVersion(InetAddress endpoint, VersionedValue value)
 +    {
 +        try
 +        {
 +            MessagingService.instance().setVersion(endpoint, Integer.valueOf(value.value));
 +        }
 +        catch (NumberFormatException e)
 +        {
 +            throw new AssertionError("Got invalid value for NET_VERSION application state: " + value.value);
 +        }
 +    }
 +
      public void updateTopology(InetAddress endpoint)
      {
          if (getTokenMetadata().isMember(endpoint))
@@@ -1885,6 -1850,43 +1913,42 @@@
          tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
      }
  
 -
+     private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+     {
+         InetAddress oldNode;
+         try
+         {
+             oldNode = InetAddress.getByName(pieces[1]);
+         }
+         catch (Exception e)
+         {
+             logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+             return;
+         }
+ 
+         if (FailureDetector.instance.isAlive(oldNode))
+         {
+             throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+         }
+ 
+         Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+         if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+         {
+             throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+                                                      newNode, replacingNode.get(), oldNode));
+         }
+ 
+         Collection<Token> tokens = getTokensFor(newNode);
+ 
+         if (logger.isDebugEnabled())
+             logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+ 
+         tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+         PendingRangeCalculatorService.instance.update();
+ 
+         tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+     }
+ 
      /**
       * Handle node move to normal state. That is, node is entering token ring and participating
       * in reads.