You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/11 17:11:54 UTC

svn commit: r834940 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: dht/ gms/ io/ locator/ service/ tools/

Author: jbellis
Date: Wed Nov 11 16:11:53 2009
New Revision: 834940

URL: http://svn.apache.org/viewvc?rev=834940&view=rev
Log:
add leaving mode
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-435

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Nov 11 16:11:53 2009
@@ -63,8 +63,6 @@
   */
 public class BootStrapper
 {
-    public static final long INITIAL_DELAY = 30 * 1000; //ms
-
     private static final Logger logger = Logger.getLogger(BootStrapper.class);
 
     /* endpoints that need to be bootstrapped */
@@ -174,7 +172,7 @@
         {
             for (Range myRange : myRanges)
             {
-                if (range.contains(myRange.right()))
+                if (range.contains(myRange))
                 {
                     List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch().sortByProximity(address, rangeAddresses.get(range));
                     myRangeAddresses.putAll(myRange, preferred);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Wed Nov 11 16:11:53 2009
@@ -57,7 +57,7 @@
         {
             throw new IOError(e);
         }
-        return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
+        return new Message(FBUtilities.getLocalAddress(), StorageService.streamStage_, StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
     }        
     
     protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java Wed Nov 11 16:11:53 2009
@@ -70,7 +70,7 @@
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug(bsmd.toString());
-                Streaming.transferRanges(bsmd.target_, bsmd.ranges_);
+                Streaming.transferRanges(bsmd.target_, bsmd.ranges_, null);
             }
         }
         catch (IOException ex)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Wed Nov 11 16:11:53 2009
@@ -103,6 +103,12 @@
         }        
     }
 
+    public boolean contains(Range range)
+    {
+        return (contains(range.left_) || range.left_.equals(left_))
+               && contains(range.right_);
+    }
+
     /**
      * Tells if the given range is a wrap around.
      * @param range

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Nov 11 16:11:53 2009
@@ -76,9 +76,9 @@
                     }
                 }
             }
-            catch ( Throwable th )
+            catch (Exception e)
             {
-                logger_.info( LogUtil.throwableToString(th) );
+                throw new RuntimeException(e);
             }
         }
     }
@@ -165,19 +165,6 @@
     }
 
     /**
-     * This method is used to forcibly remove a node from the membership
-     * set. He is forgotten locally immediately.
-     *
-     * param@ ep the endpoint to be removed from membership.
-     */
-    public synchronized void removeFromMembership(InetAddress ep)
-    {
-        endPointStateMap_.remove(ep);
-        liveEndpoints_.remove(ep);
-        unreachableEndpoints_ .remove(ep);
-    }
-
-    /**
      * This method is part of IFailureDetectionEventListener interface. This is invoked
      * by the Failure Detector when it convicts an end point.
      *
@@ -792,12 +779,6 @@
         }
     }
 
-    public ApplicationState getApplicationState(InetAddress endpoint, String stateName)
-    {
-        assert endPointStateMap_.containsKey(endpoint);
-        return endPointStateMap_.get(endpoint).getApplicationState(stateName);
-    }
-
     /**
      * Start the gossiper with the generation # retrieved from the System
      * table
@@ -836,6 +817,10 @@
         epState.addApplicationState(key, appState);
     }
 
+    public void stop()
+    {
+        gossipTimer_.cancel();
+    }
 }
 
 class JoinVerbHandler implements IVerbHandler

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Nov 11 16:11:53 2009
@@ -27,11 +27,12 @@
 public class Streaming
 {
     private static Logger logger = Logger.getLogger(Streaming.class);
+    public static final long RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized
 
     /**
      * split out files on disk locally for each range and then stream them to the target endpoint
     */
-    public static void transferRanges(InetAddress target, Collection<Range> ranges) throws IOException
+    public static void transferRanges(InetAddress target, Collection<Range> ranges, Runnable callback)
     {
         assert ranges.size() > 0;
 
@@ -46,20 +47,29 @@
         List<String> tables = DatabaseDescriptor.getTables();
         for (String tName : tables)
         {
-            Table table = Table.open(tName);
-            if (logger.isDebugEnabled())
-              logger.debug("Flushing memtables ...");
-            table.flush(false);
-            if (logger.isDebugEnabled())
-              logger.debug("Performing anticompaction ...");
-            /* Get the list of files that need to be streamed */
-            List<String> fileList = new ArrayList<String>();
-            for (SSTableReader sstable : table.forceAntiCompaction(ranges, target))
+            try
             {
-                fileList.addAll(sstable.getAllFilenames());
+                Table table = Table.open(tName);
+                if (logger.isDebugEnabled())
+                  logger.debug("Flushing memtables ...");
+                table.flush(false);
+                if (logger.isDebugEnabled())
+                  logger.debug("Performing anticompaction ...");
+                /* Get the list of files that need to be streamed */
+                List<String> fileList = new ArrayList<String>();
+                for (SSTableReader sstable : table.forceAntiCompaction(ranges, target))
+                {
+                    fileList.addAll(sstable.getAllFilenames());
+                }
+                transferOneTable(target, fileList, tName); // also deletes the file, so no further cleanup needed
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
             }
-            transferOneTable(target, fileList, tName); // also deletes the file, so no further cleanup needed
         }
+        if (callback != null)
+            callback.run();
     }
 
     private static void transferOneTable(InetAddress target, List<String> fileList, String table) throws IOException
@@ -247,8 +257,10 @@
             MessagingService.instance().sendOneWay(message, host);
 
             /* If we're done with everything for this host, remove from bootstrap sources */
-            if (StorageService.instance().isBootstrapMode() && StreamContextManager.isDone(host))
+            if (StreamContextManager.isDone(host) && StorageService.instance().isBootstrapMode())
+            {
                 StorageService.instance().removeBootstrapSource(host);
+            }
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Nov 11 16:11:53 2009
@@ -205,4 +205,48 @@
         temp.update(pendingToken, pendingAddress);
         return getAddressRanges(temp).get(pendingAddress);
     }
+
+    /**
+     * @param endpoint the endpoint leaving
+     * @return a map of where the endpoint's current ranges get sent
+     */
+    public Multimap<Range, InetAddress> getRangeAddressesAfterLeaving(InetAddress endpoint)
+    {
+        TokenMetadata metadataAfterLeaving = tokenMetadata_.cloneWithoutPending();
+        metadataAfterLeaving.removeEndpoint(endpoint);
+        Multimap<Range, InetAddress> rangesAfterLeaving = getRangeAddresses(metadataAfterLeaving);
+
+        Multimap<Range, InetAddress> map = HashMultimap.create();
+        for (Range range : getAddressRanges().get(endpoint))
+        {
+            for (Range newRange : rangesAfterLeaving.keySet())
+            {
+                if (newRange.contains(range))
+                {
+                    map.putAll(range, rangesAfterLeaving.get(newRange));
+                    break;
+                }
+            }
+        }
+
+        return map;
+    }
+
+    public void removeObsoletePendingRanges()
+    {
+        Multimap<InetAddress, Range> ranges = getAddressRanges();
+        for (Map.Entry<Range, InetAddress> entry : tokenMetadata_.getPendingRanges().entrySet())
+        {
+            for (Range currentRange : ranges.get(entry.getValue()))
+            {
+                if (currentRange.contains(entry.getKey()))
+                {
+                    if (logger_.isDebugEnabled())
+                        logger_.debug("Removing obsolete pending range " + entry.getKey() + " from " + entry.getValue());
+                    tokenMetadata_.removePendingRange(entry.getKey());
+                    break;
+                }
+            }
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Nov 11 16:11:53 2009
@@ -29,11 +29,6 @@
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.service.UnavailableException;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
@@ -76,7 +71,7 @@
         Range sourceRange = getPrimaryRangeFor(getToken(source));
         for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
         {
-            if (sourceRange.contains(entry.getKey().right()) || entry.getValue().equals(source))
+            if (sourceRange.contains(entry.getKey()) || entry.getValue().equals(source))
                 n++;
         }
         return n;
@@ -104,6 +99,21 @@
         }
     }
 
+    public void removeEndpoint(InetAddress endpoint)
+    {
+        assert tokenToEndPointMap.containsValue(endpoint);
+        lock.writeLock().lock();
+        try
+        {
+            tokenToEndPointMap.inverse().remove(endpoint);
+            sortedTokens = sortTokens();
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
     public Token getToken(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -137,14 +147,12 @@
 
     public InetAddress getFirstEndpoint()
     {
+        assert tokenToEndPointMap.size() > 0;
+
         lock.readLock().lock();
         try
         {
-            ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
-            if (tokens.isEmpty())
-                return null;
-            Collections.sort(tokens);
-            return tokenToEndPointMap.get(tokens.get(0));
+            return tokenToEndPointMap.get(sortedTokens.get(0));
         }
         finally
         {
@@ -234,17 +242,9 @@
         pendingRanges.put(range, endpoint);
     }
 
-    public void removePendingRanges(InetAddress endpoint)
+    public void removePendingRange(Range range)
     {
-        Iterator<Map.Entry<Range, InetAddress>> iter = pendingRanges.entrySet().iterator();
-        while (iter.hasNext())
-        {
-            Map.Entry<Range, InetAddress> entry = iter.next();
-            if (entry.getValue().equals(endpoint))
-            {
-                iter.remove();
-            }
-        }
+        pendingRanges.remove(range);
     }
 
     /** a mutable map may be returned but caller should not modify it */
@@ -253,6 +253,19 @@
         return pendingRanges;
     }
 
+    public List<Range> getPendingRanges(InetAddress endpoint)
+    {
+        List<Range> ranges = new ArrayList<Range>();
+        for (Map.Entry<Range, InetAddress> entry : pendingRanges.entrySet())
+        {
+            if (entry.getValue().equals(endpoint))
+            {
+                ranges.add(entry.getKey());
+            }
+        }
+        return ranges;
+    }
+
     public Token getPredecessor(Token token)
     {
         List tokens = sortedTokens();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Nov 11 16:11:53 2009
@@ -29,7 +29,6 @@
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.Gossiper;
@@ -39,6 +38,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.io.Streaming;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -378,7 +378,7 @@
                 Thread.sleep(100);
             }
             // one more sleep in case there are some stragglers
-            Thread.sleep(BootStrapper.INITIAL_DELAY);
+            Thread.sleep(Streaming.RING_DELAY);
         }
         catch (InterruptedException e)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Nov 11 16:11:53 2009
@@ -45,6 +45,9 @@
 import org.apache.log4j.Level;
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
+
 /*
  * This abstraction contains the token/identifier of this node
  * on the identifier space. This token gets gossiped around.
@@ -58,11 +61,14 @@
     // these aren't in an enum since other gossip users can create states ad-hoc too (e.g. load broadcasting)
     public final static String STATE_NORMAL = "NORMAL";
     public final static String STATE_BOOTSTRAPPING = "BOOTSTRAPPING";
+    public final static String STATE_LEAVING = "LEAVING";
+    public final static String STATE_LEFT = "LEFT";
 
     /* All stage identifiers */
     public final static String mutationStage_ = "ROW-MUTATION-STAGE";
     public final static String readStage_ = "ROW-READ-STAGE";
-    
+    public final static String streamStage_ = "STREAM-STAGE";
+
     /* All verb handler identifiers */
     public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
     public final static String tokenVerbHandler_ = "TOKEN-VERB-HANDLER";
@@ -228,6 +234,7 @@
                                    new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
         StageManager.registerStage(StorageService.readStage_,
                                    new MultiThreadedStage(StorageService.readStage_, DatabaseDescriptor.getConcurrentReaders()));
+        StageManager.registerStage(StorageService.streamStage_, new SingleThreadedStage(StorageService.streamStage_));
 
         Class<AbstractReplicationStrategy> cls = DatabaseDescriptor.getReplicaPlacementStrategyClass();
         Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class};
@@ -357,21 +364,80 @@
      */
     public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
     {
-        Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
-
         if (STATE_BOOTSTRAPPING.equals(stateName))
         {
+            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " state bootstrapping, token " + token);
             updateBootstrapRanges(token, endpoint);
         }
         else if (STATE_NORMAL.equals(stateName))
         {
+            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
             if (logger_.isDebugEnabled())
                 logger_.debug(endpoint + " state normal, token " + token);
-            tokenMetadata_.removePendingRanges(endpoint);
+            replicationStrategy_.removeObsoletePendingRanges();
             updateForeignToken(token, endpoint);
         }
+        else if (STATE_LEAVING.equals(stateName))
+        {
+            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
+            assert tokenMetadata_.getToken(endpoint).equals(token);
+            updateLeavingRanges(endpoint);
+        }
+        else if (STATE_LEFT.equals(stateName))
+        {
+            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
+            assert tokenMetadata_.getToken(endpoint).equals(token);
+            tokenMetadata_.removeEndpoint(endpoint);
+            replicationStrategy_.removeObsoletePendingRanges();
+        }
+    }
+
+    private Multimap<Range, InetAddress> getChangedRangesForLeaving(InetAddress endpoint)
+    {
+        Multimap<Range, InetAddress> newRangeAddresses = replicationStrategy_.getRangeAddressesAfterLeaving(endpoint);
+        if (logger_.isDebugEnabled())
+            logger_.debug("leaving node ranges are [" + StringUtils.join(newRangeAddresses.keySet(), ", ") + "]");
+        Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
+        for (final Range range : newRangeAddresses.keySet())
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug("considering Range " + range);
+            for (InetAddress newEndpoint : newRangeAddresses.get(range))
+            {
+                boolean alreadyReplicatesRange = false;
+                for (Range existingRange : getRangesForEndPoint(newEndpoint))
+                {
+                    if (existingRange.contains(range))
+                    {
+                        alreadyReplicatesRange = true;
+                        break;
+                    }
+                }
+                if (!alreadyReplicatesRange)
+                {
+                    if (logger_.isDebugEnabled())
+                        logger_.debug(newEndpoint + " needs pendingrange " + range);
+                    changedRanges.put(range, newEndpoint);
+                }
+            }
+        }
+        return changedRanges;        
+    }
+
+    private void updateLeavingRanges(final InetAddress endpoint)
+    {
+        if (logger_.isDebugEnabled())
+            logger_.debug(endpoint + " is leaving; calculating pendingranges");
+        Multimap<Range, InetAddress> ranges = getChangedRangesForLeaving(endpoint);
+        for (Range range : ranges.keySet())
+        {
+            for (InetAddress newEndpoint : ranges.get(range))
+            {
+                tokenMetadata_.addPendingRange(range, newEndpoint);
+            }
+        }
     }
 
     private void updateBootstrapRanges(Token token, InetAddress endpoint)
@@ -651,7 +717,6 @@
             logger_.debug("computing ranges for " + StringUtils.join(sortedTokens, ", "));
 
         List<Range> ranges = new ArrayList<Range>();
-        Collections.sort(sortedTokens);
         int size = sortedTokens.size();
         for (int i = 1; i < size; ++i)
         {
@@ -851,6 +916,55 @@
         return tokens;
     }
 
+    public void decommission()
+    {
+        if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
+            throw new UnsupportedOperationException("local node is not a member of the token ring yet");
+        if (tokenMetadata_.sortedTokens().size() < 2)
+            throw new UnsupportedOperationException("no other nodes in the ring; decommission would be pointless");
+        if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
+            throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+
+        logger_.info("DECOMMISSIONING");
+        Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+        logger_.info("decommission sleeping " + Streaming.RING_DELAY);
+        try
+        {
+            Thread.sleep(Streaming.RING_DELAY);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+
+        Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(FBUtilities.getLocalAddress());
+        if (logger_.isDebugEnabled())
+            logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
+        final Set<Map.Entry<Range, InetAddress>> pending = new HashSet<Map.Entry<Range, InetAddress>>(rangesMM.entries());
+        for (final Map.Entry<Range, InetAddress> entry : rangesMM.entries())
+        {
+            final Range range = entry.getKey();
+            final InetAddress newEndpoint = entry.getValue();
+            final Runnable callback = new Runnable()
+            {
+                public synchronized void run()
+                {
+                    pending.remove(entry);
+                    if (pending.isEmpty())
+                        finishLeaving();
+                }
+            };
+            StageManager.getStage(streamStage_).execute(new Runnable()
+            {
+                public void run()
+                {
+                    // TODO each call to transferRanges re-flushes, this is potentially a lot of waste
+                    Streaming.transferRanges(newEndpoint, Arrays.asList(range), callback);
+                }
+            });
+        }
+    }
+
     public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int consistency_level)
     {
         return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
@@ -860,4 +974,20 @@
     {
         return replicationStrategy_;
     }
+
+    public void finishLeaving()
+    {
+        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
+        try
+        {
+            Thread.sleep(2 * Gossiper.intervalInMillis_);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        Gossiper.instance().stop();
+        logger_.info("DECOMMISSION FINISHED.");
+        // let op be responsible for killing the process
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Nov 11 16:11:53 2009
@@ -113,6 +113,11 @@
      */
     public void forceTableFlush(String tableName, String... columnFamilies) throws IOException;
 
+    /**
+     * transfer this node's data to other machines and remove it from service.
+     */
+    public void decommission();
+
     /** set the logging level at runtime */
     public void setLog4jLevel(String classQualifier, String level);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=834940&r1=834939&r2=834940&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Nov 11 16:11:53 2009
@@ -380,6 +380,11 @@
         ssProxy.clearSnapshot();
     }
 
+    public void decommission()
+    {
+        ssProxy.decommission();
+    }
+
     /**
      * Print out the size of the queues in the thread pools
      *
@@ -474,7 +479,8 @@
     {
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
-                "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, tpstats, flush, " +
+                "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
+                "tpstats, flush, decommission, " +
                 " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
@@ -533,6 +539,10 @@
         {
             probe.printColumnFamilyStats(System.out);
         }
+        else if (cmdName.equals("decommission"))
+        {
+            probe.decommission();
+        }
         else if (cmdName.equals("snapshot"))
         {
             String snapshotName = "";