You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 17:37:04 UTC
svn commit: r888866 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
service/StorageService.java service/StorageServiceMBean.java
tools/NodeProbe.java
Author: jbellis
Date: Wed Dec 9 16:37:03 2009
New Revision: 888866
URL: http://svn.apache.org/viewvc?rev=888866&view=rev
Log:
Add removeToken command to StorageService and nodeprobe. patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-564
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=888866&r1=888865&r2=888866&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec 9 16:37:03 2009
@@ -68,6 +68,10 @@
public final static String STATE_LEAVING = "LEAVING";
public final static String STATE_LEFT = "LEFT";
+ private final static char StateDelimiter = ',';
+ private final static String REMOVE_TOKEN = "remove";
+ private final static String LEFT_NORMALLY = "left";
+
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
@@ -433,13 +437,107 @@
}
else if (STATE_LEFT.equals(stateName))
{
- Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
- assert tokenMetadata_.getToken(endpoint).equals(token);
- tokenMetadata_.removeEndpoint(endpoint);
+ // STATE_LEFT state is of form (REMOVE_TOKEN|LEFT_NORMALLY)<StateDelimiter><token>
+ String stateValue = state.getValue();
+ int index = stateValue.indexOf(StateDelimiter);
+ assert (index != -1);
+ String typeOfState = stateValue.substring(0, index);
+ Token token = getPartitioner().getTokenFactory().fromString(stateValue.substring(index + 1));
+
+ if (typeOfState.equals(LEFT_NORMALLY))
+ {
+ if (tokenMetadata_.isMember(endpoint))
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(endpoint + " state left, token " + token);
+ assert tokenMetadata_.getToken(endpoint).equals(token);
+ tokenMetadata_.removeEndpoint(endpoint);
+ }
+ }
+ else
+ {
+ assert (typeOfState.equals(REMOVE_TOKEN));
+ InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
+ if (endPointThatLeft != null)
+ {
+ restoreReplicaCount(endPointThatLeft);
+ tokenMetadata_.removeEndpoint(endPointThatLeft);
+ }
+ }
+
replicationStrategy_.removeObsoletePendingRanges();
}
}
+ /**
+ * Called when endPoint is removed from the ring without proper
+ * STATE_LEAVING -> STATE_LEFT sequence. This function checks
+ * whether this node becomes responsible for new ranges as a
+ * consequence and streams data if needed.
+ *
+ * This is rather ineffective, but it does not matter so much
+ * since this is called very seldom
+ *
+ * @param endPoint node that has left
+ */
+ private void restoreReplicaCount(InetAddress endPoint)
+ {
+ InetAddress myAddress = FBUtilities.getLocalAddress();
+
+ // get all ranges that change ownership (that is, a node needs
+ // to take responsibility for new range)
+ Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(endPoint);
+
+ // check if any of these ranges are coming our way
+ Set<Range> myNewRanges = new HashSet<Range>();
+ for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
+ {
+ if (entry.getValue().equals(myAddress))
+ myNewRanges.add(entry.getKey());
+ }
+
+ if (!myNewRanges.isEmpty())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(endPoint + " was removed, my added ranges: " + StringUtils.join(myNewRanges, ", "));
+
+ Multimap<Range, InetAddress> rangeAddresses = replicationStrategy_.getRangeAddresses(tokenMetadata_);
+ Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
+ IFailureDetector failureDetector = FailureDetector.instance();
+
+ // find alive sources for our new ranges
+ for (Range myNewRange : myNewRanges)
+ {
+ List<InetAddress> sources = DatabaseDescriptor.getEndPointSnitch().sortByProximity(myAddress, rangeAddresses.get(myNewRange));
+
+ assert (!sources.contains(myAddress));
+
+ for (InetAddress source : sources)
+ {
+ if (source.equals(endPoint))
+ continue;
+
+ if (failureDetector.isAlive(source))
+ {
+ sourceRanges.put(source, myNewRange);
+ break;
+ }
+ }
+ }
+
+ // Finally we have a list of addresses and ranges to
+ // stream. Proceed to stream
+ for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Requesting from " + entry.getKey() + " ranges " + StringUtils.join(entry.getValue(), ", "));
+ Streaming.requestRanges(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
private Multimap<Range, InetAddress> getChangedRangesForLeaving(InetAddress endpoint)
{
// First get all ranges the leaving endpoint is responsible for
@@ -1033,7 +1131,7 @@
if (logger_.isDebugEnabled())
logger_.debug("");
- Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
+ Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(LEFT_NORMALLY + StateDelimiter + getLocalToken().toString()));
try
{
Thread.sleep(2 * Gossiper.intervalInMillis_);
@@ -1132,6 +1230,37 @@
unbootstrap(finishMoving);
}
+ public void removeToken(String tokenString)
+ {
+ Token token = partitioner_.getTokenFactory().fromString(tokenString);
+
+ // Here we could refuse the operation from continuing if we
+ // cannot find the endpoint for this token from metadata, but
+ // that would prevent this command from being issued by a node
+ // that has never seen the failed node.
+ InetAddress endPoint = tokenMetadata_.getEndPoint(token);
+ if (endPoint != null)
+ {
+ // Let's make sure however that we're not removing a live
+ // token (member)
+ if (Gossiper.instance().getLiveMembers().contains(endPoint))
+ throw new UnsupportedOperationException("Node " + endPoint + " is alive and owns this token. Use decommission command to remove it from the ring");
+
+ restoreReplicaCount(endPoint);
+ tokenMetadata_.removeEndpoint(endPoint);
+ replicationStrategy_.removeObsoletePendingRanges();
+ }
+
+ // This is not the cleanest way as we're adding STATE_LEFT for
+ // a foreign token to our own EP state. Another way would be
+ // to add new AP state for this command, but that would again
+ // increase the amount of data to be gossiped in the cluster -
+ // not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
+ // between removetoken command and normal state left, so it is
+ // not so bad.
+ Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(REMOVE_TOKEN + StateDelimiter + token.toString()));
+ }
+
public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
{
return replicationStrategy_.getWriteResponseHandler(blockFor, consistency_level);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=888866&r1=888865&r2=888866&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Dec 9 16:37:03 2009
@@ -147,6 +147,12 @@
*/
public void cancelPendingRanges();
+ /**
+ * removeToken removes token (and all data associated with
+ * enpoint that had it) from the ring
+ */
+ public void removeToken(String token);
+
/** set the logging level at runtime */
public void setLog4jLevel(String classQualifier, String level);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=888866&r1=888865&r2=888866&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Dec 9 16:37:03 2009
@@ -403,6 +403,11 @@
ssProxy.cancelPendingRanges();
}
+ public void removeToken(String token)
+ {
+ ssProxy.removeToken(token);
+ }
+
/**
* Print out the size of the queues in the thread pools
*
@@ -498,7 +503,7 @@
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
- "tpstats, flush, repair, decommission, move, loadbalance, cancelpending, " +
+ "tpstats, flush, repair, decommission, move, loadbalance, cancelpending, removetoken, " +
" getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -577,6 +582,14 @@
{
probe.cancelPendingRanges();
}
+ else if (cmdName.equals("removetoken"))
+ {
+ if (arguments.length <= 1)
+ {
+ System.err.println("missing token argument");
+ }
+ probe.removeToken(arguments[1]);
+ }
else if (cmdName.equals("snapshot"))
{
String snapshotName = "";