You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/06 03:13:59 UTC
svn commit: r1165468 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Tue Sep 6 01:13:58 2011
New Revision: 1165468
URL: http://svn.apache.org/viewvc?rev=1165468&view=rev
Log:
convenience workflow for replacing dead node
patch by Vijay; reviewed by Nick Bailey for CASSANDRA-957
Modified:
cassandra/trunk/NEWS.txt
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Tue Sep 6 01:13:58 2011
@@ -31,6 +31,9 @@ Features
only replicas known to be down when the write started were hinted.)
This means that running with read repair completely off is much more
viable than before.
+ - A dead node may be replaced in a single step by starting a new node
+ with -Dcassandra.replace_token=<token>. More details can be found at
+ http://wiki.apache.org/cassandra/Operations#Replacing_a_Dead_Node
Other
-----
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Sep 6 01:13:58 2011
@@ -604,6 +604,11 @@ public class DatabaseDescriptor
return System.getProperty("cassandra.initial_token", conf.initial_token);
}
+ public static String getReplaceToken()
+ {
+ return System.getProperty("cassandra.replace_token", null);
+ }
+
public static String getClusterName()
{
return conf.cluster_name;
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Sep 6 01:13:58 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
@@ -134,9 +135,9 @@ public class HintedHandOffManager implem
return true;
}
- private static void deleteHint(ByteBuffer endpointAddress, ByteBuffer hintId, long timestamp) throws IOException
+ private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId, long timestamp) throws IOException
{
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, endpointAddress);
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tokenBytes);
rm.delete(new QueryPath(HINTS_CF, hintId), timestamp);
rm.apply();
}
@@ -158,9 +159,12 @@ public class HintedHandOffManager implem
public void deleteHintsForEndpoint(final InetAddress endpoint)
{
- final String ipaddr = endpoint.getHostAddress();
+ if (!StorageService.instance.getTokenMetadata().isMember(endpoint))
+ return;
+ Token<?> token = StorageService.instance.getTokenMetadata().getToken(endpoint);
+ ByteBuffer tokenBytes = StorageService.getPartitioner().getTokenFactory().toByteArray(token);
final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
- final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, ByteBufferUtil.bytes(ipaddr));
+ final RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tokenBytes);
rm.delete(new QueryPath(HINTS_CF), System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
@@ -170,14 +174,14 @@ public class HintedHandOffManager implem
{
try
{
- logger_.info("Deleting any stored hints for " + ipaddr);
+ logger_.info("Deleting any stored hints for " + endpoint);
rm.apply();
hintStore.forceFlush();
CompactionManager.instance.submitMaximal(hintStore, Integer.MAX_VALUE);
}
catch (Exception e)
{
- logger_.warn("Could not delete hints for " + ipaddr + ": " + e);
+ logger_.warn("Could not delete hints for " + endpoint + ": " + e);
}
}
};
@@ -223,7 +227,7 @@ public class HintedHandOffManager implem
{
try
{
- logger_.debug("Checking remote schema before delivering hints");
+ logger_.debug("Checking remote({}) schema before delivering hints", endpoint);
int waited = waitForSchemaAgreement(endpoint);
// sleep a random amount to stagger handoff delivery from different replicas.
// (if we had to wait, then gossiper randomness took care of that for us already.)
@@ -244,15 +248,17 @@ public class HintedHandOffManager implem
queuedDeliveries.remove(endpoint);
}
- logger_.info("Started hinted handoff for endpoint " + endpoint);
-
// 1. Get the key of the endpoint we need to handoff
// 2. For each column, deserialize the mutation and send it to the endpoint
// 3. Delete the subcolumn if the write was successful
// 4. Force a flush
// 5. Do major compaction to clean up all deletes etc.
- ByteBuffer endpointAsUTF8 = ByteBufferUtil.bytes(endpoint.getHostAddress()); // keys have to be UTF8 to make OPP happy
- DecoratedKey<?> epkey = StorageService.getPartitioner().decorateKey(endpointAsUTF8);
+
+ // find the hints for the node using its token.
+ Token<?> token = StorageService.instance.getTokenMetadata().getToken(endpoint);
+ logger_.info("Started hinted handoff for token: {} with IP: {}", token, endpoint);
+ ByteBuffer tokenBytes = StorageService.getPartitioner().getTokenFactory().toByteArray(token);
+ DecoratedKey<?> epkey = StorageService.getPartitioner().decorateKey(tokenBytes);
int rowsReplayed = 0;
ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
@@ -282,7 +288,7 @@ public class HintedHandOffManager implem
if (sendMutation(endpoint, rm))
{
- deleteHint(endpointAsUTF8, hint.name(), versionColumn.timestamp());
+ deleteHint(tokenBytes, hint.name(), versionColumn.timestamp());
rowsReplayed++;
}
else
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Sep 6 01:13:58 2011
@@ -120,9 +120,9 @@ public class RowMutation implements IMut
* }
*
*/
- public static RowMutation hintFor(RowMutation mutation, ByteBuffer address) throws IOException
+ public static RowMutation hintFor(RowMutation mutation, ByteBuffer token) throws IOException
{
- RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, address);
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, token);
ByteBuffer hintId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
// determine the TTL for the RowMutation
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Sep 6 01:13:58 2011
@@ -61,7 +61,7 @@ public class BootStrapper
/* endpoints that need to be bootstrapped */
protected final InetAddress address;
/* tokens of the nodes being bootstrapped. */
- protected final Token token;
+ protected final Token<?> token;
protected final TokenMetadata tokenMetadata;
private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
@@ -100,7 +100,6 @@ public class BootStrapper
for (Map.Entry<InetAddress, Collection<Range>> entry : rangesToFetch.get(table))
{
final InetAddress source = entry.getKey();
- Collection<Range> ranges = entry.getValue();
final Runnable callback = new Runnable()
{
public void run()
@@ -257,7 +256,8 @@ public class BootStrapper
{
for (InetAddress source : rangesWithSourceTarget.get(range))
{
- if (failureDetector.isAlive(source))
+ // ignore the local IP...
+ if (failureDetector.isAlive(source) && !source.equals(FBUtilities.getBroadcastAddress()))
{
sources.put(source, range);
break;
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Tue Sep 6 01:13:58 2011
@@ -98,7 +98,7 @@ public class EndpointState
}
/* getters and setters */
- long getUpdateTimestamp()
+ public long getUpdateTimestamp()
{
return updateTimestamp;
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Tue Sep 6 01:13:58 2011
@@ -66,7 +66,8 @@ public class Gossiper implements IFailur
private static final DebuggableScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("GossipTasks");
static final ApplicationState[] STATES = ApplicationState.values();
- static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT);
+ static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
+ VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
private ScheduledFuture<?> scheduledGossipTask;
public final static int intervalInMillis = 1000;
@@ -726,10 +727,10 @@ public class Gossiper implements IFailur
*/
private void handleMajorStateChange(InetAddress ep, EndpointState epState)
{
- if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+ if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState))
{
if (endpointStateMap.get(ep) != null)
- logger.info("Node {} has restarted, now UP again", ep);
+ logger.info("Node {} has restarted, now UP", ep);
else
logger.info("Node {} is now part of the cluster", ep);
}
@@ -741,20 +742,21 @@ public class Gossiper implements IFailur
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onRestart(ep, epState);
- if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+ if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState))
markAlive(ep, epState);
else
{
logger.debug("Not marking " + ep + " alive due to dead state");
- epState.markDead();
+ markDead(ep, epState);
epState.setHasToken(true); // fat clients won't have a dead state
}
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onJoin(ep, epState);
}
- private Boolean isDeadState(String value)
+ public Boolean isDeadState(EndpointState epState)
{
+ String value = epState.getApplicationState(ApplicationState.STATUS).value;
String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
assert (pieces.length > 0);
String state = pieces[0];
@@ -812,7 +814,7 @@ public class Gossiper implements IFailur
}
else if (logger.isTraceEnabled())
logger.trace("Ignoring remote version " + remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
- if (!localEpStatePtr.isAlive()) // unless of course, it was dead
+ if (!localEpStatePtr.isAlive() && !isDeadState(localEpStatePtr)) // unless of course, it was dead
markAlive(ep, localEpStatePtr);
}
else
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Tue Sep 6 01:13:58 2011
@@ -60,6 +60,8 @@ public class VersionedValue implements C
public final static String REMOVING_TOKEN = "removing";
public final static String REMOVED_TOKEN = "removed";
+ public final static String HIBERNATE = "hibernate";
+
// values for ApplicationState.REMOVAL_COORDINATOR
public final static String REMOVAL_COORDINATOR = "REMOVER";
@@ -148,6 +150,11 @@ public class VersionedValue implements C
return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
+ public VersionedValue hibernate(boolean value)
+ {
+ return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value);
+ }
+
public VersionedValue datacenter(String dcId)
{
return new VersionedValue(dcId);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java Tue Sep 6 01:13:58 2011
@@ -29,7 +29,7 @@ import org.apache.cassandra.gms.*;
public class LoadBroadcaster implements IEndpointStateChangeSubscriber
{
- private static final int BROADCAST_INTERVAL = 60 * 1000;
+ static final int BROADCAST_INTERVAL = 60 * 1000;
public static final LoadBroadcaster instance = new LoadBroadcaster();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Tue Sep 6 01:13:58 2011
@@ -54,8 +54,14 @@ public class MigrationManager implements
// avoids re-pushing migrations that we're waiting on target to apply already
private static Map<InetAddress,UUID> lastPushed = new MapMaker().expiration(1, TimeUnit.MINUTES).makeMap();
- /** I'm not going to act here. */
- public void onJoin(InetAddress endpoint, EndpointState epState) { }
+ public void onJoin(InetAddress endpoint, EndpointState epState) {
+ VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
+ if (value != null)
+ {
+ UUID theirVersion = UUID.fromString(value.value);
+ rectify(theirVersion, endpoint);
+ }
+ }
public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Sep 6 01:13:58 2011
@@ -346,7 +346,9 @@ public class StorageProxy implements Sto
try
{
- RowMutation hintedMutation = RowMutation.hintFor(mutation, ByteBufferUtil.bytes(target.getHostAddress()));
+ Token<?> token = StorageService.instance.getTokenMetadata().getToken(target);
+ ByteBuffer tokenbytes = StorageService.getPartitioner().getTokenFactory().toByteArray(token);
+ RowMutation hintedMutation = RowMutation.hintFor(mutation, tokenbytes);
hintedMutation.apply();
totalHints.incrementAndGet();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1165468&r1=1165467&r2=1165468&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Sep 6 01:13:58 2011
@@ -205,9 +205,6 @@ public class StorageService implements I
public void finishBootstrapping()
{
isBootstrapMode = false;
- SystemTable.setBootstrapped(true);
- setToken(getLocalToken());
- logger_.info("Bootstrap/move completed! Now serving reads.");
}
/** This method updates the local token on disk */
@@ -471,6 +468,7 @@ public class StorageService implements I
Gossiper.instance.start(SystemTable.incrementAndGetGeneration()); // needed for node-ring gathering.
// add rpc listening info
Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.hibernate(null != DatabaseDescriptor.getReplaceToken()));
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
@@ -484,7 +482,8 @@ public class StorageService implements I
&& !SystemTable.isBootstrapped())
logger_.info("This node will not auto bootstrap because it is configured to be a seed node.");
- Token token;
+ // first startup is only chance to bootstrap
+ Token<?> token;
if (DatabaseDescriptor.isAutoBootstrap()
&& !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()) || SystemTable.isBootstrapped()))
{
@@ -499,25 +498,42 @@ public class StorageService implements I
}
if (logger_.isDebugEnabled())
logger_.debug("... got ring + schema info");
- if (tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
+ if (null != DatabaseDescriptor.getReplaceToken())
{
- String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
- throw new UnsupportedOperationException(s);
+ try
+ {
+ // Sleeping additionally to make sure that the server actually is not alive
+ // and giving it more time to gossip if alive.
+ Thread.sleep(LoadBroadcaster.BROADCAST_INTERVAL);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getReplaceToken());
+ // check for operator errors...
+ InetAddress current = tokenMetadata_.getEndpoint(token);
+ if (null != current && Gossiper.instance.getEndpointStateForEndpoint(current).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
+ throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
+ setMode("Joining: Replacing a node with token: " + token, true);
+ }
+ else
+ {
+ if (tokenMetadata_.isMember(FBUtilities.getBroadcastAddress()))
+ {
+ String s = "This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)";
+ throw new UnsupportedOperationException(s);
+ }
+ setMode("Joining: getting bootstrap token", true);
+ token = BootStrapper.getBootstrapToken(tokenMetadata_, LoadBroadcaster.instance.getLoadInfo());
}
- setMode("Joining: getting bootstrap token", true);
- token = BootStrapper.getBootstrapToken(tokenMetadata_, LoadBroadcaster.instance.getLoadInfo());
// don't bootstrap if there are no tables defined.
if (Schema.instance.getNonSystemTables().size() > 0)
{
bootstrap(token);
assert !isBootstrapMode; // bootstrap will block until finished
}
- else
- {
- // nothing to bootstrap, go directly to participating in ring
- SystemTable.setBootstrapped(true);
- setToken(token);
- }
+ // Else: nothing to bootstrap, go directly to participating in ring
}
else
{
@@ -542,8 +558,10 @@ public class StorageService implements I
}
}
- SystemTable.setBootstrapped(true); // first startup is only chance to bootstrap
+ // start participating in the ring.
+ SystemTable.setBootstrapped(true);
setToken(token);
+ logger_.info("Bootstrap/Replace/Move completed! Now serving reads.");
assert tokenMetadata_.sortedTokens().size() > 0;
}
@@ -578,17 +596,26 @@ public class StorageService implements I
{
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.bootstrapping(token));
- setMode("Joining: sleeping " + RING_DELAY + " ms for pending range setup", true);
- try
+ if (null == DatabaseDescriptor.getReplaceToken())
{
- Thread.sleep(RING_DELAY);
+ // if not an existing token then bootstrap
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.bootstrapping(token));
+ setMode("Joining: sleeping " + RING_DELAY + " ms for pending range setup", true);
+ try
+ {
+ Thread.sleep(RING_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
}
- catch (InterruptedException e)
+ else
{
- throw new AssertionError(e);
+ // Dont set any state for the node which is bootstrapping the existing token...
+ tokenMetadata_.updateNormalToken(token, FBUtilities.getBroadcastAddress());
}
- setMode("Bootstrapping", true);
+ setMode("Starting to bootstrap...", true);
new BootStrapper(FBUtilities.getBroadcastAddress(), token, tokenMetadata_).bootstrap(); // handles token update
}
@@ -993,9 +1020,9 @@ public class StorageService implements I
private void excise(Token token, InetAddress endpoint)
{
+ HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
Gossiper.instance.removeEndpoint(endpoint);
tokenMetadata_.removeEndpoint(endpoint);
- HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
tokenMetadata_.removeBootstrapToken(token);
calculatePendingRanges();
if (!isClientMode)