You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/10/18 04:06:22 UTC
[3/6] git commit: Rework node replacement. Patch by brandonwilliams,
reviewed by Tyle Hobbs for CASSANDRA-5916
Rework node replacement.
Patch by brandonwilliams, reviewed by Tyle Hobbs for CASSANDRA-5916
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/351d43ef
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/351d43ef
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/351d43ef
Branch: refs/heads/trunk
Commit: 351d43ef120a725cf83d29f80fd243ad9fc30fc2
Parents: c5368c7
Author: Brandon Williams <br...@apache.org>
Authored: Thu Oct 17 20:59:36 2013 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Thu Oct 17 20:59:36 2013 -0500
----------------------------------------------------------------------
NEWS.txt | 8 +-
.../cassandra/config/DatabaseDescriptor.java | 17 ++-
.../org/apache/cassandra/db/SystemTable.java | 9 +-
.../gms/GossipDigestAckVerbHandler.java | 10 +-
.../gms/GossipDigestSynVerbHandler.java | 2 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 69 ++++++++++-
.../cassandra/service/StorageService.java | 122 ++++++++++++-------
7 files changed, 182 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 000986f..a676aa1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -14,15 +14,15 @@ restore snapshots created with the previous major version using the
using the provided 'sstableupgrade' tool.
-1.2.12
+1.2.11
======
Features
--------
- Added a new consistenct level, LOCAL_ONE, that forces all CL.ONE operations to
execute only in the local datacenter.
-
-1.2.11
-======
+ - New replace_address to supplant the (now removed) replace_token and
+ replace_node workflows to replace a dead node in place. Works like the
+ old options, but takes the IP address of the node to be replaced.
Upgrading
---------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 633ea9a..65a20cc 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -724,6 +724,21 @@ public class DatabaseDescriptor
return conf.num_tokens;
}
+ public static InetAddress getReplaceAddress()
+ {
+ try
+ {
+ if (System.getProperty("cassandra.replace_address", null) != null)
+ return InetAddress.getByName(System.getProperty("cassandra.replace_address", null));
+ else
+ return null;
+ }
+ catch (UnknownHostException e)
+ {
+ return null;
+ }
+ }
+
public static Collection<String> getReplaceTokens()
{
return tokensFromString(System.getProperty("cassandra.replace_token", null));
@@ -742,7 +757,7 @@ public class DatabaseDescriptor
public static boolean isReplacing()
{
- return 0 != getReplaceTokens().size() || getReplaceNode() != null;
+ return getReplaceAddress() != null;
}
public static String getClusterName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index a87ab50..432a434 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -654,8 +654,15 @@ public class SystemTable
// ID not found, generate a new one, persist, and then return it.
hostId = UUID.randomUUID();
logger.warn("No host ID found, created {} (Note: This should happen exactly once per node).", hostId);
+ return setLocalHostId(hostId);
+ }
- req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
+ /**
+ * Sets the local host ID explicitly. Should only be called outside of SystemTable when replacing a node.
+ */
+ public static UUID setLocalHostId(UUID hostId)
+ {
+ String req = "INSERT INTO system.%s (key, host_id) VALUES ('%s', %s)";
processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, hostId));
return hostId;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 2a03ff2..b2af3a2 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -39,7 +39,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
InetAddress from = message.from;
if (logger.isTraceEnabled())
logger.trace("Received a GossipDigestAckMessage from {}", from);
- if (!Gossiper.instance.isEnabled())
+ if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
{
if (logger.isTraceEnabled())
logger.trace("Ignoring GossipDigestAckMessage because gossip is disabled");
@@ -49,6 +49,7 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
GossipDigestAck gDigestAckMessage = message.payload;
List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
Map<InetAddress, EndpointState> epStateMap = gDigestAckMessage.getEndpointStateMap();
+ logger.trace("Received ack with {} digests and {} states", gDigestList.size(), epStateMap.size());
if ( epStateMap.size() > 0 )
{
@@ -58,6 +59,13 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
}
Gossiper.instance.checkSeedContact(from);
+ if (Gossiper.instance.isInShadowRound())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Finishing shadow round with {}", from);
+ Gossiper.instance.finishShadowRound();
+ return; // don't bother doing anything else, we have what we came for
+ }
/* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 61d21ed..476cb72 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -76,7 +76,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
Map<InetAddress, EndpointState> deltaEpStateMap = new HashMap<InetAddress, EndpointState>();
Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
-
+ logger.trace("sending {} digests and {} deltas", deltaGossipDigestList.size(), deltaEpStateMap.size());
MessageOut<GossipDigestAck> gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap),
GossipDigestAck.serializer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index acf40f3..cbb7d80 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -107,6 +107,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
// have we ever in our lifetime reached a seed?
private boolean seedContacted = false;
+ private boolean inShadowRound = false;
+
private class GossipTask implements Runnable
{
public void run()
@@ -662,6 +664,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
+ // removes ALL endpoint states; should only be called after shadow gossip
+ public void resetEndpointStateMap()
+ {
+ endpointStateMap.clear();
+ }
+
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -874,7 +882,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
{
InetAddress ep = entry.getKey();
- if ( ep.equals(FBUtilities.getBroadcastAddress()))
+ if ( ep.equals(FBUtilities.getBroadcastAddress()) && !isInShadowRound())
continue;
if (justRemovedEndpoints.containsKey(ep))
{
@@ -989,6 +997,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
*/
void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndpointState> deltaEpStateMap)
{
+ if (gDigestList.size() == 0)
+ {
+ /* we've been sent a *completely* empty syn, which should normally never happen since an endpoint will at least send a syn with itself.
+ If this is happening then the node is attempting shadow gossip, and we should reply with everything we know.
+ */
+ logger.debug("Shadow request received, adding all states");
+ for (Map.Entry<InetAddress, EndpointState> entry : endpointStateMap.entrySet())
+ {
+ gDigestList.add(new GossipDigest(entry.getKey(), 0, 0));
+ }
+ }
for ( GossipDigest gDigest : gDigestList )
{
int remoteGeneration = gDigest.getGeneration();
@@ -1074,6 +1093,43 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
TimeUnit.MILLISECONDS);
}
+ /**
+ * Do a single 'shadow' round of gossip, where we do not modify any state
+ * Only used when replacing a node, to get and assume its states
+ */
+ public void doShadowRound()
+ {
+ buildSeedsList();
+ // send a completely empty syn
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
+ DatabaseDescriptor.getPartitionerName(),
+ gDigests);
+ MessageOut<GossipDigestSyn> message = new MessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
+ digestSynMessage,
+ GossipDigestSyn.serializer);
+ inShadowRound = true;
+ for (InetAddress seed : seeds)
+ MessagingService.instance().sendOneWay(message, seed);
+ int slept = 0;
+ try
+ {
+ while (true)
+ {
+ Thread.sleep(1000);
+ if (!inShadowRound)
+ break;
+ slept += 1000;
+ if (slept > StorageService.RING_DELAY)
+ throw new RuntimeException("Unable to gossip with any seeds");
+ }
+ }
+ catch (InterruptedException wtf)
+ {
+ throw new RuntimeException(wtf);
+ }
+ }
+
private void buildSeedsList()
{
for (InetAddress seed : DatabaseDescriptor.getSeeds())
@@ -1154,6 +1210,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
+ protected void finishShadowRound()
+ {
+ if (inShadowRound)
+ inShadowRound = false;
+ }
+
+ protected boolean isInShadowRound()
+ {
+ return inShadowRound;
+ }
+
@VisibleForTesting
public void initializeNodeUnsafe(InetAddress addr, UUID uuid, int generationNbr)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/351d43ef/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 50719fd..d2c69d0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -387,6 +387,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return initialized;
}
+ public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
+ {
+ logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
+ MessagingService.instance().listen(FBUtilities.getLocalAddress());
+
+ // make magic happen
+ Gossiper.instance.doShadowRound();
+
+ Collection<Token> tokens = new ArrayList<Token>();
+ UUID hostId = null;
+ // now that we've gossiped at least once, we should be able to find the node we're replacing
+ if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())== null)
+ throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
+ hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+ try
+ {
+ if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
+ throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
+ tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ SystemTable.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
+ MessagingService.instance().shutdown();
+ Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
+ return tokens;
+ }
+
public synchronized void initClient() throws IOException, ConfigurationException
{
// We don't wait, because we're going to actually try to work on
@@ -561,22 +591,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void joinTokenRing(int delay) throws ConfigurationException
{
- logger.info("Starting up server gossip");
joined = true;
- // Seed the host ID-to-endpoint map with our own ID.
- getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress());
+ Collection<Token> tokens = null;
+ Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
+ if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null)
+ throw new RuntimeException("Replace method removed; use cassandra.replace_address instead");
+ if (DatabaseDescriptor.isReplacing())
+ {
+ if (!DatabaseDescriptor.isAutoBootstrap())
+ throw new RuntimeException("Trying to replace_address with auto_bootstrap disabled will not work, check your configuration");
+ tokens = prepareReplacementInfo();
+ appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+ appStates.put(ApplicationState.TOKENS, valueFactory.tokens(tokens));
+ }
// have to start the gossip service before we can see any info on other nodes. this is necessary
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a counterId to our state, below.)
- Map<ApplicationState, VersionedValue> appStates = new HashMap<ApplicationState, VersionedValue>();
+ // Seed the host ID-to-endpoint map with our own ID.
+ getTokenMetadata().updateHostId(SystemTable.getLocalHostId(), FBUtilities.getBroadcastAddress());
appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(SystemTable.getLocalHostId()));
appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
- if (DatabaseDescriptor.isReplacing())
- appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
+ logger.info("Starting up server gossip");
Gossiper.instance.register(this);
Gossiper.instance.register(migrationManager);
Gossiper.instance.start(SystemTable.incrementAndGetGeneration(), appStates); // needed for node-ring gathering.
@@ -602,7 +641,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
Set<InetAddress> current = new HashSet<InetAddress>();
- Collection<Token> tokens;
logger.debug("Bootstrap variables: {} {} {} {}",
new Object[]{ DatabaseDescriptor.isAutoBootstrap(),
SystemTable.bootstrapInProgress(),
@@ -667,50 +705,36 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
else
{
- if (DatabaseDescriptor.getReplaceTokens().size() != 0 && DatabaseDescriptor.getReplaceNode() != null)
- throw new UnsupportedOperationException("You cannot specify both replace_token and replace_node, choose one or the other");
- try
- {
- // Sleeping additionally to make sure that the server actually is not alive
- // and giving it more time to gossip if alive.
- Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
- }
- catch (InterruptedException e)
+ if (!DatabaseDescriptor.getReplaceAddress().equals(FBUtilities.getBroadcastAddress()))
{
- throw new AssertionError(e);
- }
- tokens = new ArrayList<Token>();
- if (DatabaseDescriptor.getReplaceTokens().size() !=0)
- {
- for (String token : DatabaseDescriptor.getReplaceTokens())
- tokens.add(StorageService.getPartitioner().getTokenFactory().fromString(token));
- }
- else
- {
- assert DatabaseDescriptor.getReplaceNode() != null;
- InetAddress endpoint = tokenMetadata.getEndpointForHostId(DatabaseDescriptor.getReplaceNode());
- if (endpoint == null)
- throw new UnsupportedOperationException("Cannot replace host id " + DatabaseDescriptor.getReplaceNode() + " because it does not exist!");
- tokens = tokenMetadata.getTokens(endpoint);
- }
-
- // check for operator errors...
- for (Token token : tokens)
- {
- InetAddress existing = tokenMetadata.getEndpoint(token);
- if (existing != null)
+ try
{
- if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
- throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
- current.add(existing);
+ // Sleep additionally to make sure that the server actually is not alive
+ // and giving it more time to gossip if alive.
+ Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
}
- else
+ catch (InterruptedException e)
{
- throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!");
+ throw new AssertionError(e);
}
- }
- setMode(Mode.JOINING, "Replacing a node with token: " + tokens, true);
+ // check for operator errors...
+ for (Token token : tokens)
+ {
+ InetAddress existing = tokenMetadata.getEndpoint(token);
+ if (existing != null)
+ {
+ if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
+ throw new UnsupportedOperationException("Cannnot replace a live node... ");
+ current.add(existing);
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Cannot replace token " + token + " which does not exist!");
+ }
+ }
+ }
+ setMode(Mode.JOINING, "Replacing a node with token(s): " + tokens, true);
}
bootstrap(tokens);
@@ -1355,7 +1379,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (Gossiper.instance.usesHostId(endpoint))
- tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
+ {
+ UUID hostId = Gossiper.instance.getHostId(endpoint);
+ if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
+ logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
+ else
+ tokenMetadata.updateHostId(hostId, endpoint);
+ }
Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();