You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/11 17:11:54 UTC
svn commit: r834940 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: dht/ gms/ io/
locator/ service/ tools/
Author: jbellis
Date: Wed Nov 11 16:11:53 2009
New Revision: 834940
URL: http://svn.apache.org/viewvc?rev=834940&view=rev
Log:
add leaving mode
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-435
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Nov 11 16:11:53 2009
@@ -63,8 +63,6 @@
*/
public class BootStrapper
{
- public static final long INITIAL_DELAY = 30 * 1000; //ms
-
private static final Logger logger = Logger.getLogger(BootStrapper.class);
/* endpoints that need to be bootstrapped */
@@ -174,7 +172,7 @@
{
for (Range myRange : myRanges)
{
- if (range.contains(myRange.right()))
+ if (range.contains(myRange))
{
List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch().sortByProximity(address, rangeAddresses.get(range));
myRangeAddresses.putAll(myRange, preferred);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Wed Nov 11 16:11:53 2009
@@ -57,7 +57,7 @@
{
throw new IOError(e);
}
- return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
+ return new Message(FBUtilities.getLocalAddress(), StorageService.streamStage_, StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
}
protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Wed Nov 11 16:11:53 2009
@@ -70,7 +70,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug(bsmd.toString());
- Streaming.transferRanges(bsmd.target_, bsmd.ranges_);
+ Streaming.transferRanges(bsmd.target_, bsmd.ranges_, null);
}
}
catch (IOException ex)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Wed Nov 11 16:11:53 2009
@@ -103,6 +103,12 @@
}
}
+ public boolean contains(Range range)
+ {
+ return (contains(range.left_) || range.left_.equals(left_))
+ && contains(range.right_);
+ }
+
/**
* Tells if the given range is a wrap around.
* @param range
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Nov 11 16:11:53 2009
@@ -76,9 +76,9 @@
}
}
}
- catch ( Throwable th )
+ catch (Exception e)
{
- logger_.info( LogUtil.throwableToString(th) );
+ throw new RuntimeException(e);
}
}
}
@@ -165,19 +165,6 @@
}
/**
- * This method is used to forcibly remove a node from the membership
- * set. He is forgotten locally immediately.
- *
- * param@ ep the endpoint to be removed from membership.
- */
- public synchronized void removeFromMembership(InetAddress ep)
- {
- endPointStateMap_.remove(ep);
- liveEndpoints_.remove(ep);
- unreachableEndpoints_ .remove(ep);
- }
-
- /**
* This method is part of IFailureDetectionEventListener interface. This is invoked
* by the Failure Detector when it convicts an end point.
*
@@ -792,12 +779,6 @@
}
}
- public ApplicationState getApplicationState(InetAddress endpoint, String stateName)
- {
- assert endPointStateMap_.containsKey(endpoint);
- return endPointStateMap_.get(endpoint).getApplicationState(stateName);
- }
-
/**
* Start the gossiper with the generation # retrieved from the System
* table
@@ -836,6 +817,10 @@
epState.addApplicationState(key, appState);
}
+ public void stop()
+ {
+ gossipTimer_.cancel();
+ }
}
class JoinVerbHandler implements IVerbHandler
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Nov 11 16:11:53 2009
@@ -27,11 +27,12 @@
public class Streaming
{
private static Logger logger = Logger.getLogger(Streaming.class);
+ public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
/**
* split out files on disk locally for each range and then stream them to the target endpoint
*/
- public static void transferRanges(InetAddress target, Collection<Range> ranges) throws IOException
+ public static void transferRanges(InetAddress target, Collection<Range> ranges, Runnable callback)
{
assert ranges.size() > 0;
@@ -46,20 +47,29 @@
List<String> tables = DatabaseDescriptor.getTables();
for (String tName : tables)
{
- Table table = Table.open(tName);
- if (logger.isDebugEnabled())
- logger.debug("Flushing memtables ...");
- table.flush(false);
- if (logger.isDebugEnabled())
- logger.debug("Performing anticompaction ...");
- /* Get the list of files that need to be streamed */
- List<String> fileList = new ArrayList<String>();
- for (SSTableReader sstable : table.forceAntiCompaction(ranges, target))
+ try
{
- fileList.addAll(sstable.getAllFilenames());
+ Table table = Table.open(tName);
+ if (logger.isDebugEnabled())
+ logger.debug("Flushing memtables ...");
+ table.flush(false);
+ if (logger.isDebugEnabled())
+ logger.debug("Performing anticompaction ...");
+ /* Get the list of files that need to be streamed */
+ List<String> fileList = new ArrayList<String>();
+ for (SSTableReader sstable : table.forceAntiCompaction(ranges, target))
+ {
+ fileList.addAll(sstable.getAllFilenames());
+ }
+ transferOneTable(target, fileList, tName); // also deletes the file, so no further cleanup needed
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
}
- transferOneTable(target, fileList, tName); // also deletes the file, so no further cleanup needed
}
+ if (callback != null)
+ callback.run();
}
private static void transferOneTable(InetAddress target, List<String> fileList, String table) throws IOException
@@ -247,8 +257,10 @@
MessagingService.instance().sendOneWay(message, host);
/* If we're done with everything for this host, remove from bootstrap sources */
- if (StorageService.instance().isBootstrapMode() && StreamContextManager.isDone(host))
+ if (StreamContextManager.isDone(host) && StorageService.instance().isBootstrapMode())
+ {
StorageService.instance().removeBootstrapSource(host);
+ }
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Nov 11 16:11:53 2009
@@ -205,4 +205,48 @@
temp.update(pendingToken, pendingAddress);
return getAddressRanges(temp).get(pendingAddress);
}
+
+ /**
+ * @param endpoint the endpoint leaving
+ * @return a map of where the endpoint's current ranges get sent
+ */
+ public Multimap<Range, InetAddress> getRangeAddressesAfterLeaving(InetAddress endpoint)
+ {
+ TokenMetadata metadataAfterLeaving = tokenMetadata_.cloneWithoutPending();
+ metadataAfterLeaving.removeEndpoint(endpoint);
+ Multimap<Range, InetAddress> rangesAfterLeaving = getRangeAddresses(metadataAfterLeaving);
+
+ Multimap<Range, InetAddress> map = HashMultimap.create();
+ for (Range range : getAddressRanges().get(endpoint))
+ {
+ for (Range newRange : rangesAfterLeaving.keySet())
+ {
+ if (newRange.contains(range))
+ {
+ map.putAll(range, rangesAfterLeaving.get(newRange));
+ break;
+ }
+ }
+ }
+
+ return map;
+ }
+
+ public void removeObsoletePendingRanges()
+ {
+ Multimap<InetAddress, Range> ranges = getAddressRanges();
+ for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
+ {
+ for (Range currentRange : ranges.get(entry.getValue()))
+ {
+ if (currentRange.contains(entry.getKey()))
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Removing obsolete pending range " + entry.getKey() + " from " + entry.getValue());
+ tokenMetadata_.removePendingRange(entry.getKey());
+ break;
+ }
+ }
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Nov 11 16:11:53 2009
@@ -29,11 +29,6 @@
import org.apache.commons.lang.StringUtils;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.service.UnavailableException;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@@ -76,7 +71,7 @@
Range sourceRange = getPrimaryRangeFor(getToken(source));
for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
{
- if (sourceRange.contains(entry.getKey().right()) || entry.getValue().equals(source))
+ if (sourceRange.contains(entry.getKey()) || entry.getValue().equals(source))
n++;
}
return n;
@@ -104,6 +99,21 @@
}
}
+ public void removeEndpoint(InetAddress endpoint)
+ {
+ assert tokenToEndPointMap.containsValue(endpoint);
+ lock.writeLock().lock();
+ try
+ {
+ tokenToEndPointMap.inverse().remove(endpoint);
+ sortedTokens = sortTokens();
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public Token getToken(InetAddress endpoint)
{
assert endpoint != null;
@@ -137,14 +147,12 @@
public InetAddress getFirstEndpoint()
{
+ assert tokenToEndPointMap.size() > 0;
+
lock.readLock().lock();
try
{
- ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
- if (tokens.isEmpty())
- return null;
- Collections.sort(tokens);
- return tokenToEndPointMap.get(tokens.get(0));
+ return tokenToEndPointMap.get(sortedTokens.get(0));
}
finally
{
@@ -234,17 +242,9 @@
pendingRanges.put(range, endpoint);
}
- public void removePendingRanges(InetAddress endpoint)
+ public void removePendingRange(Range range)
{
- Iterator<Map.Entry<Range, InetAddress>> iter = pendingRanges.entrySet().iterator();
- while (iter.hasNext())
- {
- Map.Entry<Range, InetAddress> entry = iter.next();
- if (entry.getValue().equals(endpoint))
- {
- iter.remove();
- }
- }
+ pendingRanges.remove(range);
}
/** a mutable map may be returned but caller should not modify it */
@@ -253,6 +253,19 @@
return pendingRanges;
}
+ public List<Range> getPendingRanges(InetAddress endpoint)
+ {
+ List<Range> ranges = new ArrayList<Range>();
+ for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
+ {
+ if (entry.getValue().equals(endpoint))
+ {
+ ranges.add(entry.getKey());
+ }
+ }
+ return ranges;
+ }
+
public Token getPredecessor(Token token)
{
List tokens = sortedTokens();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Nov 11 16:11:53 2009
@@ -29,7 +29,6 @@
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.Gossiper;
@@ -39,6 +38,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.Streaming;
/*
* The load balancing algorithm here is an implementation of
@@ -378,7 +378,7 @@
Thread.sleep(100);
}
// one more sleep in case there are some stragglers
- Thread.sleep(BootStrapper.INITIAL_DELAY);
+ Thread.sleep(Streaming.RING_DELAY);
}
catch (InterruptedException e)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 11 16:11:53 2009
@@ -45,6 +45,9 @@
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
+
/*
* This abstraction contains the token/identifier of this node
* on the identifier space. This token gets gossiped around.
@@ -58,11 +61,14 @@
// these aren't in an enum since other gossip users can create states ad-hoc too (e.g. load broadcasting)
public final static String STATE_NORMAL = "NORMAL";
public final static String STATE_BOOTSTRAPPING = "BOOTSTRAPPING";
+ public final static String STATE_LEAVING = "LEAVING";
+ public final static String STATE_LEFT = "LEFT";
/* All stage identifiers */
public final static String mutationStage_ = "ROW-MUTATION-STAGE";
public final static String readStage_ = "ROW-READ-STAGE";
-
+ public final static String streamStage_ = "STREAM-STAGE";
+
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
public final static String tokenVerbHandler_ = "TOKEN-VERB-HANDLER";
@@ -228,6 +234,7 @@
new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
StageManager.registerStage(StorageService.readStage_,
new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
+ StageManager.registerStage(StorageService.streamStage_, new SingleThreadedStage(StorageService.streamStage_));
Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class};
@@ -357,21 +364,80 @@
*/
public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
{
- Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
-
if (STATE_BOOTSTRAPPING.equals(stateName))
{
+ Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state bootstrapping, token " + token);
updateBootstrapRanges(token, endpoint);
}
else if (STATE_NORMAL.equals(stateName))
{
+ Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state normal, token " + token);
- tokenMetadata_.removePendingRanges(endpoint);
+ replicationStrategy_.removeObsoletePendingRanges();
updateForeignToken(token, endpoint);
}
+ else if (STATE_LEAVING.equals(stateName))
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
+ assert tokenMetadata_.getToken(endpoint).equals(token);
+ updateLeavingRanges(endpoint);
+ }
+ else if (STATE_LEFT.equals(stateName))
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
+ assert tokenMetadata_.getToken(endpoint).equals(token);
+ tokenMetadata_.removeEndpoint(endpoint);
+ replicationStrategy_.removeObsoletePendingRanges();
+ }
+ }
+
+ private Multimap<Range, InetAddress> getChangedRangesForLeaving(InetAddress endpoint)
+ {
+ Multimap<Range, InetAddress> newRangeAddresses = replicationStrategy_.getRangeAddressesAfterLeaving(endpoint);
+ if (logger_.isDebugEnabled())
+ logger_.debug("leaving node ranges are [" + StringUtils.join(newRangeAddresses.keySet(), ", ") + "]");
+ Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
+ for (final Range range : newRangeAddresses.keySet())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("considering Range " + range);
+ for (InetAddress newEndpoint : newRangeAddresses.get(range))
+ {
+ boolean alreadyReplicatesRange = false;
+ for (Range existingRange : getRangesForEndPoint(newEndpoint))
+ {
+ if (existingRange.contains(range))
+ {
+ alreadyReplicatesRange = true;
+ break;
+ }
+ }
+ if (!alreadyReplicatesRange)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(newEndpoint + " needs pendingrange " + range);
+ changedRanges.put(range, newEndpoint);
+ }
+ }
+ }
+ return changedRanges;
+ }
+
+ private void updateLeavingRanges(final InetAddress endpoint)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(endpoint + " is leaving; calculating pendingranges");
+ Multimap<Range, InetAddress> ranges = getChangedRangesForLeaving(endpoint);
+ for (Range range : ranges.keySet())
+ {
+ for (InetAddress newEndpoint : ranges.get(range))
+ {
+ tokenMetadata_.addPendingRange(range, newEndpoint);
+ }
+ }
}
private void updateBootstrapRanges(Token token, InetAddress endpoint)
@@ -651,7 +717,6 @@
logger_.debug("computing ranges for " + StringUtils.join(sortedTokens, ", "));
List<Range> ranges = new ArrayList<Range>();
- Collections.sort(sortedTokens);
int size = sortedTokens.size();
for (int i = 1; i < size; ++i)
{
@@ -851,6 +916,55 @@
return tokens;
}
+ public void decommission()
+ {
+ if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
+ throw new UnsupportedOperationException("local node is not a member of the token ring yet");
+ if (tokenMetadata_.sortedTokens().size() < 2)
+ throw new UnsupportedOperationException("no other nodes in the ring; decommission would be pointless");
+ if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
+ throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+
+ logger_.info("DECOMMISSIONING");
+ Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+ logger_.info("decommission sleeping " + Streaming.RING_DELAY);
+ try
+ {
+ Thread.sleep(Streaming.RING_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(FBUtilities.getLocalAddress());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
+ final Set<Map.Entry<Range, InetAddress>> pending = new HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries());
+ for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
+ {
+ final Range range = entry.getKey();
+ final InetAddress newEndpoint = entry.getValue();
+ final Runnable callback = new Runnable()
+ {
+ public synchronized void run()
+ {
+ pending.remove(entry);
+ if (pending.isEmpty())
+ finishLeaving();
+ }
+ };
+ StageManager.getStage(streamStage_).execute(new Runnable()
+ {
+ public void run()
+ {
+ // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
+ Streaming.transferRanges(newEndpoint, Arrays.asList(range), callback);
+ }
+ });
+ }
+ }
+
public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int consistency_level)
{
return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
@@ -860,4 +974,20 @@
{
return replicationStrategy_;
}
+
+ public void finishLeaving()
+ {
+ Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
+ try
+ {
+ Thread.sleep(2 * Gossiper.intervalInMillis_);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ Gossiper.instance().stop();
+ logger_.info("DECOMMISSION FINISHED.");
+ // let op be responsible for killing the process
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Nov 11 16:11:53 2009
@@ -113,6 +113,11 @@
*/
public void forceTableFlush(String tableName, String... columnFamilies) throws IOException;
+ /**
+ * transfer this node's data to other machines and remove it from service.
+ */
+ public void decommission();
+
/** set the logging level at runtime */
public void setLog4jLevel(String classQualifier, String level);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Nov 11 16:11:53 2009
@@ -380,6 +380,11 @@
ssProxy.clearSnapshot();
}
+ public void decommission()
+ {
+ ssProxy.decommission();
+ }
+
/**
* Print out the size of the queues in the thread pools
*
@@ -474,7 +479,8 @@
{
HelpFormatter hf = new HelpFormatter();
String header = String.format(
- "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, tpstats, flush, " +
+ "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
+ "tpstats, flush, decommission, " +
" getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -533,6 +539,10 @@
{
probe.printColumnFamilyStats(System.out);
}
+ else if (cmdName.equals("decommission"))
+ {
+ probe.decommission();
+ }
else if (cmdName.equals("snapshot"))
{
String snapshotName = "";