You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/08/31 19:29:10 UTC
[2/6] cassandra git commit: Forward writes to replacement node when
replace_address != broadcast_address
Forward writes to replacement node when replace_address != broadcast_address
Patch by Paulo Motta; reviewed by Richard Low for CASSANDRA-8523
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b39d984f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b39d984f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b39d984f
Branch: refs/heads/cassandra-3.0
Commit: b39d984f7bd682c7638415d65dcc4ac9bcb74e5f
Parents: 6eff082
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Jun 17 21:09:31 2016 -0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Aug 31 20:21:30 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 5 +-
.../apache/cassandra/gms/VersionedValue.java | 6 +
.../apache/cassandra/locator/TokenMetadata.java | 54 ++++++-
.../cassandra/service/LoadBroadcaster.java | 2 +-
.../cassandra/service/StorageService.java | 139 +++++++++++++++----
6 files changed, 177 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f7cf0e..d7e9394 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.8
+ * Forward writes to replacement node when replace_address != broadcast_address (CASSANDRA-8523)
* Enable repair -pr and -local together (fix regression of CASSANDRA-7450) (CASSANDRA-12522)
* Fail repair on non-existing table (CASSANDRA-12279)
* cqlsh copy: fix missing counter values (CASSANDRA-12476)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 00e3da8..a8f9524 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -76,6 +76,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
static {
SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
+ SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE);
}
private volatile ScheduledFuture<?> scheduledGossipTask;
@@ -333,10 +334,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
if (epState == null)
return;
- logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
if (!epState.isAlive())
return;
+ logger.debug("Convicting {} with status {} - alive {}", endpoint, getGossipStatus(epState), epState.isAlive());
+
+
if (isShutdown(endpoint))
{
markAsShutdown(endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 3ea7bb4..661d3ba 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -65,6 +65,7 @@ public class VersionedValue implements Comparable<VersionedValue>
// values for ApplicationState.STATUS
public final static String STATUS_BOOTSTRAPPING = "BOOT";
+ public final static String STATUS_BOOTSTRAPPING_REPLACE = "BOOT_REPLACE";
public final static String STATUS_NORMAL = "NORMAL";
public final static String STATUS_LEAVING = "LEAVING";
public final static String STATUS_LEFT = "LEFT";
@@ -133,6 +134,11 @@ public class VersionedValue implements Comparable<VersionedValue>
return new VersionedValue(value.value);
}
+ public VersionedValue bootReplacing(InetAddress oldNode)
+ {
+ return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING_REPLACE, oldNode.getHostAddress()));
+ }
+
public VersionedValue bootstrapping(Collection<Token> tokens)
{
return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index de16fda..b06c9c8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Optional;
import com.google.common.collect.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -79,6 +80,9 @@ public class TokenMetadata
// means we can detect and reject the addition of multiple nodes at the same token
// before one becomes part of the ring.
private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+
+ private final BiMap<InetAddress, InetAddress> replacementToOriginal = HashBiMap.create();
+
// (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
private final Set<InetAddress> leavingEndpoints = new HashSet<>();
// this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
@@ -185,6 +189,7 @@ public class TokenMetadata
tokenToEndpointMap.removeValue(endpoint);
topology.addEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
+ replacementToOriginal.remove(endpoint);
removeFromMoving(endpoint); // also removing this endpoint from moving
for (Token token : tokens)
@@ -297,13 +302,17 @@ public class TokenMetadata
public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
{
+ addBootstrapTokens(tokens, endpoint, null);
+ }
+
+ private void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint, InetAddress original)
+ {
assert tokens != null && !tokens.isEmpty();
assert endpoint != null;
lock.writeLock().lock();
try
{
-
InetAddress oldEndpoint;
for (Token token : tokens)
@@ -313,7 +322,7 @@ public class TokenMetadata
throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
oldEndpoint = tokenToEndpointMap.get(token);
- if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+ if (oldEndpoint != null && !oldEndpoint.equals(endpoint) && !oldEndpoint.equals(original))
throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
}
@@ -328,6 +337,43 @@ public class TokenMetadata
}
}
+ public void addReplaceTokens(Collection<Token> replacingTokens, InetAddress newNode, InetAddress oldNode)
+ {
+ assert replacingTokens != null && !replacingTokens.isEmpty();
+ assert newNode != null && oldNode != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ Collection<Token> oldNodeTokens = tokenToEndpointMap.inverse().get(oldNode);
+ if (!replacingTokens.containsAll(oldNodeTokens) || !oldNodeTokens.containsAll(replacingTokens))
+ {
+ throw new RuntimeException(String.format("Node %s is trying to replace node %s with tokens %s with a " +
+ "different set of tokens %s.", newNode, oldNode, oldNodeTokens,
+ replacingTokens));
+ }
+
+ logger.debug("Replacing {} with {}", newNode, oldNode);
+ replacementToOriginal.put(newNode, oldNode);
+
+ addBootstrapTokens(replacingTokens, newNode, oldNode);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public Optional<InetAddress> getReplacementNode(InetAddress endpoint)
+ {
+ return Optional.fromNullable(replacementToOriginal.inverse().get(endpoint));
+ }
+
+ public Optional<InetAddress> getReplacingNode(InetAddress endpoint)
+ {
+ return Optional.fromNullable((replacementToOriginal.get(endpoint)));
+ }
+
public void removeBootstrapTokens(Collection<Token> tokens)
{
assert tokens != null && !tokens.isEmpty();
@@ -391,6 +437,10 @@ public class TokenMetadata
tokenToEndpointMap.removeValue(endpoint);
topology.removeEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
+ if (replacementToOriginal.remove(endpoint) != null)
+ {
+ logger.debug("Node {} failed during replace.", endpoint);
+ }
endpointToHostIdMap.remove(endpoint);
sortedTokens = sortTokens();
invalidateCachedRings();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 69fa93d..945dd2f 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.gms.*;
public class LoadBroadcaster implements IEndpointStateChangeSubscriber
{
- static final int BROADCAST_INTERVAL = 60 * 1000;
+ static final int BROADCAST_INTERVAL = Integer.getInteger("cassandra.broadcast_interval_ms", 60 * 1000);
public static final LoadBroadcaster instance = new LoadBroadcaster();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b39d984f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 48a291b..9197ab1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -32,6 +32,7 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
@@ -185,6 +186,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true"));
private static final boolean allowSimultaneousMoves = Boolean.valueOf(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false"));
private boolean replacing;
+ private UUID replacingId;
private final StreamStateStore streamStateStore = new StreamStateStore();
@@ -194,9 +196,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (logger.isDebugEnabled())
logger.debug("Setting tokens to {}", tokens);
SystemKeyspace.updateTokens(tokens);
- tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
Collection<Token> localTokens = getLocalTokens();
setGossipTokens(localTokens);
+ tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
setMode(Mode.NORMAL, false);
}
@@ -431,11 +433,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// make magic happen
Gossiper.instance.doShadowRound();
- UUID hostId = null;
// now that we've gossiped at least once, we should be able to find the node we're replacing
if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
- hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ replacingId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
try
{
VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
@@ -443,7 +444,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
- SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+ if (isReplacingSameAddress())
+ {
+ SystemKeyspace.setLocalHostId(replacingId); // use the replacee's host Id as our own so we receive hints, etc
+ }
Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
return tokens;
}
@@ -472,7 +476,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// ignore local node or empty status
if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
continue;
- String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1);
+ String[] pieces = splitValue(entry.getValue().getApplicationState(ApplicationState.STATUS));
assert (pieces.length > 0);
String state = pieces[0];
if (state.equals(VersionedValue.STATUS_BOOTSTRAPPING) || state.equals(VersionedValue.STATUS_LEAVING) || state.equals(VersionedValue.STATUS_MOVING))
@@ -681,8 +685,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
if (!DatabaseDescriptor.isAutoBootstrap())
throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
bootstrapTokens = prepareReplacementInfo();
- appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
- appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+ if (isReplacingSameAddress())
+ {
+ logger.warn("Writes will not be forwarded to this node during replacement because it has the same address as " +
+ "the node to be replaced ({}). If the previous node has been down for longer than max_hint_window_in_ms, " +
+ "repair must be run after the replacement process in order to make this node consistent.",
+ DatabaseDescriptor.getReplaceAddress());
+ appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
+ appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+ }
}
else if (shouldBootstrap())
{
@@ -799,7 +810,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else
{
- if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
+ if (!isReplacingSameAddress())
{
try
{
@@ -885,17 +896,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (dataAvailable)
{
- // start participating in the ring.
- SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
- setTokens(bootstrapTokens);
+ finishJoiningRing();
+
// remove the existing info about the replaced node.
if (!current.isEmpty())
{
for (InetAddress existing : current)
Gossiper.instance.replacedEndpoint(existing);
}
- assert tokenMetadata.sortedTokens().size() > 0;
- doAuthSetup();
}
else
{
@@ -908,6 +916,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ public static boolean isReplacingSameAddress()
+ {
+ return DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress());
+ }
+
public void gossipSnitchInfo()
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -933,16 +946,22 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else if (isSurveyMode)
{
- setTokens(SystemKeyspace.getSavedTokens());
- SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
isSurveyMode = false;
logger.info("Leaving write survey mode and joining ring at operator request");
- assert tokenMetadata.sortedTokens().size() > 0;
-
- doAuthSetup();
+ finishJoiningRing();
}
}
+ private void finishJoiningRing()
+ {
+ // start participating in the ring.
+ SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
+ setTokens(bootstrapTokens);
+
+ assert tokenMetadata.sortedTokens().size() > 0;
+ doAuthSetup();
+ }
+
private void doAuthSetup()
{
maybeAddOrUpdateKeyspace(AuthKeyspace.definition());
@@ -1000,7 +1019,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public boolean isJoined()
{
- return tokenMetadata.isMember(FBUtilities.getBroadcastAddress());
+ return tokenMetadata.isMember(FBUtilities.getBroadcastAddress()) && !isSurveyMode;
}
public void rebuild(String sourceDc)
@@ -1122,12 +1141,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
isBootstrapMode = true;
SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
- if (!replacing)
+
+ if (!replacing || !isReplacingSameAddress())
{
// if not an existing token then bootstrap
List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>();
states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
- states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens)));
+ states.add(Pair.create(ApplicationState.STATUS, replacing?
+ valueFactory.bootReplacing(DatabaseDescriptor.getReplaceAddress()) :
+ valueFactory.bootstrapping(tokens)));
Gossiper.instance.addLocalApplicationStates(states);
setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
Uninterruptibles.sleepUninterruptibly(RING_DELAY, TimeUnit.MILLISECONDS);
@@ -1138,6 +1160,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
SystemKeyspace.removeEndpoint(DatabaseDescriptor.getReplaceAddress());
}
+
if (!Gossiper.instance.seenAnySeed())
throw new IllegalStateException("Unable to contact any seeds!");
@@ -1575,14 +1598,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
if (state == ApplicationState.STATUS)
{
- String apStateValue = value.value;
- String[] pieces = apStateValue.split(VersionedValue.DELIMITER_STR, -1);
+ String[] pieces = splitValue(value);
assert (pieces.length > 0);
String moveName = pieces[0];
switch (moveName)
{
+ case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE:
+ handleStateBootreplacing(endpoint, pieces);
+ break;
case VersionedValue.STATUS_BOOTSTRAPPING:
handleStateBootstrap(endpoint);
break;
@@ -1656,6 +1681,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ private static String[] splitValue(VersionedValue value)
+ {
+ return value.value.split(VersionedValue.DELIMITER_STR, -1);
+ }
+
public void updateTopology(InetAddress endpoint)
{
if (getTokenMetadata().isMember(endpoint))
@@ -1820,6 +1850,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
}
+
+ private void handleStateBootreplacing(InetAddress newNode, String[] pieces)
+ {
+ InetAddress oldNode;
+ try
+ {
+ oldNode = InetAddress.getByName(pieces[1]);
+ }
+ catch (Exception e)
+ {
+ logger.error("Node {} tried to replace malformed endpoint {}.", newNode, pieces[1], e);
+ return;
+ }
+
+ if (FailureDetector.instance.isAlive(oldNode))
+ {
+ throw new RuntimeException(String.format("Node %s is trying to replace alive node %s.", newNode, oldNode));
+ }
+
+ Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(newNode);
+ if (replacingNode.isPresent() && !replacingNode.get().equals(oldNode))
+ {
+ throw new RuntimeException(String.format("Node %s is already replacing %s but is trying to replace %s.",
+ newNode, replacingNode.get(), oldNode));
+ }
+
+ Collection<Token> tokens = getTokensFor(newNode);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Node {} is replacing {}, tokens {}", newNode, oldNode, tokens);
+
+ tokenMetadata.addReplaceTokens(tokens, newNode, oldNode);
+ PendingRangeCalculatorService.instance.update();
+
+ tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
+ }
+
/**
* Handle node move to normal state. That is, node is entering token ring and participating
* in reads.
@@ -1844,11 +1911,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
endpoint,
Gossiper.instance.getEndpointStateForEndpoint(endpoint));
+ Optional<InetAddress> replacingNode = tokenMetadata.getReplacingNode(endpoint);
+ if (replacingNode.isPresent())
+ {
+ assert !endpoint.equals(replacingNode.get()) : "Pending replacement endpoint with same address is not supported";
+ logger.info("Node {} will complete replacement of {} for tokens {}", endpoint, replacingNode.get(), tokens);
+ if (FailureDetector.instance.isAlive(replacingNode.get()))
+ {
+ logger.error("Node {} cannot complete replacement of alive node {}.", endpoint, replacingNode.get());
+ return;
+ }
+ endpointsToRemove.add(replacingNode.get());
+ }
+
+ Optional<InetAddress> replacementNode = tokenMetadata.getReplacementNode(endpoint);
+ if (replacementNode.isPresent())
+ {
+ logger.warn("Node {} is currently being replaced by node {}.", endpoint, replacementNode.get());
+ }
+
updatePeerInfo(endpoint);
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
UUID hostId = Gossiper.instance.getHostId(endpoint);
InetAddress existing = tokenMetadata.getEndpointForHostId(hostId);
- if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+ if (replacing && isReplacingSameAddress() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null
+ && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
else
{
@@ -1933,7 +2020,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
}
if (!tokensToUpdateInSystemKeyspace.isEmpty())
- SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);;
+ SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
if (isMoving || operationMode == Mode.MOVING)
{
@@ -2058,7 +2145,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
PendingRangeCalculatorService.instance.update();
// find the endpoint coordinating this removal that we need to notify when we're done
- String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+ String[] coordinator = splitValue(Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR));
UUID hostId = UUID.fromString(coordinator[1]);
// grab any data we are now responsible for and notify responsible node
restoreReplicaCount(endpoint, tokenMetadata.getEndpointForHostId(hostId));