You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/29 07:10:01 UTC

svn commit: r1152107 - in /cassandra/trunk: ./ contrib/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ doc/cql/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/gms/ src/java/...

Author: jbellis
Date: Fri Jul 29 05:09:59 2011
New Revision: 1152107

URL: http://svn.apache.org/viewvc?rev=1152107&view=rev
Log:
merge from 0.8

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/doc/cql/CQL.textile
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7:1026516-1151306
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jul 29 05:09:59 2011
@@ -26,6 +26,19 @@
    (CASSANDRA-1951)
 
 
+0.8.3
+ * add ability to drop local reads/writes that are going to timeout
+   (CASSANDRA-2943)
+ * revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946)
+ * don't accept extra args for 0-arg nodetool commands (CASSANDRA-2740)
+ * log unavailableexception details at debug level (CASSANDRA-2856)
+ * expose data_dir though jmx (CASSANDRA-2770)
+ * don't include tmp files as sstable when create cfs (CASSANDRA-2929)
+ * log Java classpath on startup (CASSANDRA-2895)
+ * keep gossipped version in sync with actual on migration coordinator 
+   (CASSANDRA-2946)
+
+
 0.8.2
  * CQL: 
    - include only one row per unique key for IN queries (CASSANDRA-2717)

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Jul 29 05:09:59 2011
@@ -7,6 +7,16 @@ Upgrading
       sstableloader tool instead.
 
 
+0.8.3
+=====
+
+Upgrading
+---------
+    - Token removal has been revamped.  Removing tokens in a mixed cluster with
+      0.8.3 will not work, so the entire cluster will need to be running 0.8.3
+      first, except for the dead node.
+
+
 0.8.2
 =====
 

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/contrib:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Jul 29 05:09:59 2011
@@ -69,8 +69,6 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
-    private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
-
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
@@ -79,6 +77,8 @@ public class CassandraStorage extends Lo
     private boolean slice_reverse = false;
     private String keyspace;
     private String column_family;
+    private String loadSignature;
+    private String storeSignature;
 
     private Configuration conf;
     private RecordReader reader;
@@ -113,7 +113,7 @@ public class CassandraStorage extends Lo
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef();
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
             SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
             assert key != null && cf != null;
@@ -166,11 +166,11 @@ public class CassandraStorage extends Lo
         return pair;
     }
 
-    private CfDef getCfDef()
+    private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        return cfdefFromString(property.getProperty(getSchemaContextKey()));
+        return cfdefFromString(property.getProperty(signature));
     }
 
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
@@ -290,7 +290,7 @@ public class CassandraStorage extends Lo
         }
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
-        initSchema();
+        initSchema(loadSignature);
     }
 
     @Override
@@ -299,9 +299,16 @@ public class CassandraStorage extends Lo
         return location;
     }
 
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        this.loadSignature = signature;
+    }
+
     /* StoreFunc methods */
     public void setStoreFuncUDFContextSignature(String signature)
     {
+        this.storeSignature = signature;
     }
 
     public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
@@ -315,7 +322,7 @@ public class CassandraStorage extends Lo
         setLocationFromUri(location);
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
-        initSchema();
+        initSchema(storeSignature);
     }
 
     public OutputFormat getOutputFormat()
@@ -347,7 +354,7 @@ public class CassandraStorage extends Lo
         ByteBuffer key = objToBB(t.get(0));
         DefaultDataBag pairs = (DefaultDataBag) t.get(1);
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
-        CfDef cfDef = getCfDef();
+        CfDef cfDef = getCfDef(storeSignature);
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         try
@@ -412,7 +419,7 @@ public class CassandraStorage extends Lo
         }
         catch (ClassCastException e)
         {
-            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
         }
         try
         {
@@ -430,14 +437,13 @@ public class CassandraStorage extends Lo
 
     /* Methods to get the column family schema from Cassandra */
 
-    private void initSchema()
+    private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
 
-        String schemaContextKey = getSchemaContextKey();
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(schemaContextKey))
+        if (!property.containsKey(signature))
         {
             Cassandra.Client client = null;
             try
@@ -455,7 +461,7 @@ public class CassandraStorage extends Lo
                         break;
                     }
                 }
-                property.setProperty(schemaContextKey, cfdefToString(cfDef));
+                property.setProperty(signature, cfdefToString(cfDef));
             }
             catch (TException e)
             {
@@ -521,14 +527,4 @@ public class CassandraStorage extends Lo
         }
         return cfDef;
     }
-
-    private String getSchemaContextKey()
-    {
-        StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
-        sb.append('.');
-        sb.append(keyspace);
-        sb.append('.');
-        sb.append(column_family);
-        return sb.toString();
-    }
 }

Modified: cassandra/trunk/doc/cql/CQL.textile
URL: http://svn.apache.org/viewvc/cassandra/trunk/doc/cql/CQL.textile?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/doc/cql/CQL.textile (original)
+++ cassandra/trunk/doc/cql/CQL.textile Fri Jul 29 05:09:59 2011
@@ -1,4 +1,4 @@
-h1. Cassandra Query Language (CQL) v1.0.0
+h1. Cassandra Query Language (CQL) v1.1.0
 
 h2. Table of Contents
 
@@ -364,5 +364,8 @@ Versioning of the CQL language adheres t
 h1. Changes
 
 pre. 
+Sat, 01 Jun 2011 15:58:00 -0600 - Pavel Yaskevich
+ * Updated to support ALTER (CASSANDRA-1709)
+
 Tue, 22 Mar 2011 18:10:28 -0700 - Eric Evans <ee...@rackspace.com>
  * Initial version, 1.0.0

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1151306
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1151495,1151497
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jul 29 05:09:59 2011
@@ -204,6 +204,9 @@ public class HintedHandOffManager implem
         }
         waited = 0;
         // then wait for the correct schema version.
+        // usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system table.
+        // here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that
+        // causes the two to diverge (see CASSANDRA-2946)
         while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
                 gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Fri Jul 29 05:09:59 2011
@@ -29,6 +29,7 @@ public enum ApplicationState
     DC,
     RACK,
     RELEASE_VERSION,
+    REMOVAL_COORDINATOR,
     INTERNAL_IP,
     // pad to allow adding new states to existing cluster
     X1,

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Jul 29 05:09:59 2011
@@ -27,6 +27,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.config.ConfigurationException;
@@ -58,6 +59,8 @@ public class Gossiper implements IFailur
     private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks");
 
     static final ApplicationState[] STATES = ApplicationState.values();
+    static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT);
+
     private ScheduledFuture<?> scheduledGossipTask;
     public final static int intervalInMillis = 1000;
     public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
@@ -264,17 +267,21 @@ public class Gossiper implements IFailur
     }
 
     /**
-     * Removes the endpoint from unreachable endpoint set
+     * Removes the endpoint from gossip completely
      *
      * @param endpoint endpoint to be removed from the current membership.
     */
     private void evictFromMembership(InetAddress endpoint)
     {
         unreachableEndpoints.remove(endpoint);
+        endpointStateMap.remove(endpoint);
+        justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+        if (logger.isDebugEnabled())
+            logger.debug("evicting " + endpoint + " from gossip");
     }
 
     /**
-     * Removes the endpoint completely from Gossip
+     * Removes the endpoint from Gossip but retains endpoint state
      */
     public void removeEndpoint(InetAddress endpoint)
     {
@@ -288,6 +295,8 @@ public class Gossiper implements IFailur
         FailureDetector.instance.remove(endpoint);
         versions.remove(endpoint);
         justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+        if (logger.isDebugEnabled())
+            logger.debug("removing endpoint " + endpoint);
     }
 
     /**
@@ -328,6 +337,67 @@ public class Gossiper implements IFailur
         }
     }
 
+    /**
+     * This method will begin removing an existing endpoint from the cluster by spoofing its state
+     * This should never be called unless this coordinator has had 'removetoken' invoked
+     *
+     * @param endpoint - the endpoint being removed
+     * @param token - the token being removed
+     * @param mytoken - my own token for replication coordination
+     */
+    public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        // remember this node's generation
+        int generation = epState.getHeartBeatState().getGeneration();
+        logger.info("Removing token: " + token);
+        logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change");
+        try
+        {
+            Thread.sleep(StorageService.RING_DELAY);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        // make sure it did not change
+        epState = endpointStateMap.get(endpoint);
+        if (epState.getHeartBeatState().getGeneration() != generation)
+            throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
+        // update the other node's generation to mimic it as if it had changed it itself
+        logger.info("Advertising removal for " + endpoint);
+        epState.updateTimestamp(); // make sure we don't evict it too soon
+        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token));
+        epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken));
+        endpointStateMap.put(endpoint, epState);
+    }
+
+    /**
+     * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
+     * This should only be called after advertiseRemoving
+     * @param endpoint
+     * @param token
+     */
+    public void advertiseTokenRemoved(InetAddress endpoint, Token token)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        epState.updateTimestamp(); // make sure we don't evict it too soon
+        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token));
+        logger.info("Completing removal of " + endpoint);
+        endpointStateMap.put(endpoint, epState);
+        // ensure at least one gossip round occurs before returning
+        try
+        {
+            Thread.sleep(intervalInMillis * 2);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
     public boolean isKnownEndpoint(InetAddress endpoint)
     {
         return endpointStateMap.containsKey(endpoint);
@@ -456,23 +526,18 @@ public class Gossiper implements IFailur
             {
                 long duration = now - epState.getUpdateTimestamp();
 
+                if (StorageService.instance.getTokenMetadata().isMember(endpoint))
+                    epState.setHasToken(true);
                 // check if this is a fat client. fat clients are removed automatically from
                 // gosip after FatClientTimeout
-                if (!epState.hasToken() && !epState.isAlive() && (duration > FatClientTimeout))
+                if (!epState.hasToken() && !epState.isAlive() && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout))
                 {
-                    if (StorageService.instance.getTokenMetadata().isMember(endpoint))
-                        epState.setHasToken(true);
-                    else
-                    {
-                        if (!justRemovedEndpoints.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client
-                        {
-                            logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
-                            removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state
-                        }
-                    }
+                    logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
+                    removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
+                    evictFromMembership(endpoint); // can get rid of the state immediately
                 }
 
-                if ( !epState.isAlive() && (duration > aVeryLongTime) )
+                if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
                 {
                     evictFromMembership(endpoint);
                 }
@@ -488,7 +553,6 @@ public class Gossiper implements IFailur
                     if (logger.isDebugEnabled())
                         logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
                     justRemovedEndpoints.remove(entry.getKey());
-                    endpointStateMap.remove(entry.getKey());
                 }
             }
         }
@@ -585,6 +649,7 @@ public class Gossiper implements IFailur
             int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration();
             if ( remoteGeneration > localGeneration )
             {
+                localEndpointState.updateTimestamp();
                 fd.report(endpoint);
                 return;
             }
@@ -595,6 +660,7 @@ public class Gossiper implements IFailur
                 int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion();
                 if ( remoteVersion > localVersion )
                 {
+                    localEndpointState.updateTimestamp();
                     fd.report(endpoint);
                 }
             }
@@ -607,6 +673,7 @@ public class Gossiper implements IFailur
         if (logger.isTraceEnabled())
             logger.trace("marking as alive {}", addr);
         localState.markAlive();
+        localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
         liveEndpoints.add(addr);
         unreachableEndpoints.remove(addr);
         logger.info("InetAddress {} is now UP", addr);
@@ -638,10 +705,13 @@ public class Gossiper implements IFailur
      */
     private void handleMajorStateChange(InetAddress ep, EndpointState epState)
     {
-        if (endpointStateMap.get(ep) != null)
-            logger.info("Node {} has restarted, now UP again", ep);
-        else
-            logger.info("Node {} is now part of the cluster", ep);
+        if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+        {
+            if (endpointStateMap.get(ep) != null)
+                logger.info("Node {} has restarted, now UP again", ep);
+            else
+                logger.info("Node {} is now part of the cluster", ep);
+        }
         if (logger.isTraceEnabled())
             logger.trace("Adding endpoint state for " + ep);
         endpointStateMap.put(ep, epState);
@@ -651,11 +721,31 @@ public class Gossiper implements IFailur
             for (IEndpointStateChangeSubscriber subscriber : subscribers)
                 subscriber.onDead(ep, epState);
         }
-        markAlive(ep, epState);
+        if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+            markAlive(ep, epState);
+        else
+        {
+            logger.debug("Not marking " + ep + " alive due to dead state");
+            epState.markDead();
+            epState.setHasToken(true); // fat clients won't have a dead state
+        }
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
             subscriber.onJoin(ep, epState);
     }
 
+    private Boolean isDeadState(String value)
+    {
+        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        String state = pieces[0];
+        for (String deadstate : DEAD_STATES)
+        {
+            if (state.equals(deadstate))
+                return true;
+        }
+        return false;
+    }
+
     void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
     {
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java Fri Jul 29 05:09:59 2011
@@ -71,6 +71,11 @@ class HeartBeatState
     {
         return version;
     }
+
+    void forceNewerGenerationUnsafe()
+    {
+        generation += 1;
+    }
 }
 
 class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Fri Jul 29 05:09:59 2011
@@ -49,7 +49,7 @@ public class VersionedValue implements C
     public final static char DELIMITER = ',';
     public final static String DELIMITER_STR = new String(new char[] { DELIMITER });
 
-    // values for State.STATUS
+    // values for ApplicationState.STATUS
     public final static String STATUS_BOOTSTRAPPING = "BOOT";
     public final static String STATUS_NORMAL = "NORMAL";
     public final static String STATUS_LEAVING = "LEAVING";
@@ -59,6 +59,9 @@ public class VersionedValue implements C
     public final static String REMOVING_TOKEN = "removing";
     public final static String REMOVED_TOKEN = "removed";
 
+    // values for ApplicationState.REMOVAL_COORDINATOR
+    public final static String REMOVAL_COORDINATOR = "REMOVER";
+
     public final int version;
     public final String value;
 
@@ -129,20 +132,19 @@ public class VersionedValue implements C
             return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue removingNonlocal(Token localToken, Token token)
+        public VersionedValue removingNonlocal(Token token)
+        {
+            return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+        }
+
+        public VersionedValue removedNonlocal(Token token)
         {
-            return new VersionedValue(VersionedValue.STATUS_NORMAL
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
-                                        + VersionedValue.DELIMITER + VersionedValue.REMOVING_TOKEN
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue removedNonlocal(Token localToken, Token token)
+        public VersionedValue removalCoordinator(Token token)
         {
-            return new VersionedValue(VersionedValue.STATUS_NORMAL
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
-                                        + VersionedValue.DELIMITER + VersionedValue.REMOVED_TOKEN
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
         public VersionedValue datacenter(String dcId)

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 29 05:09:59 2011
@@ -472,6 +472,10 @@ public final class MessagingService impl
 
     public void receive(Message message, String id)
     {
+        if (logger_.isTraceEnabled())
+            logger_.trace(FBUtilities.getLocalAddress() + " received " + message.getVerb()
+                          + " from " + id + "@" + message.getFrom());
+
         message = SinkManager.processServerMessage(message, id);
         if (message == null)
             return;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Jul 29 05:09:59 2011
@@ -119,6 +119,7 @@ public abstract class AbstractCassandraD
     {
     	logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version") );
         logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
+		logger.info("Classpath: {}", System.getProperty("java.class.path"));
     	CLibrary.tryMlockall();
 
         listenPort = DatabaseDescriptor.getRpcPort();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jul 29 05:09:59 2011
@@ -349,7 +349,7 @@ public class StorageProxy implements Sto
     {
         if (logger.isDebugEnabled())
             logger.debug("insert writing local " + rm.toString(true));
-        Runnable runnable = new WrappedRunnable()
+        Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
             {
@@ -431,7 +431,7 @@ public class StorageProxy implements Sto
         if (logger.isDebugEnabled())
             logger.debug("insert writing local & replicate " + mutation.toString(true));
 
-        Runnable runnable = new WrappedRunnable()
+        Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
         {
             public void runMayThrow() throws IOException
             {
@@ -447,7 +447,7 @@ public class StorageProxy implements Sto
                 {
                     // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
                     // and we want to avoid blocking too much the MUTATION stage
-                    StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
+                    StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ)
                     {
                         public void runMayThrow() throws IOException
                         {
@@ -616,7 +616,7 @@ public class StorageProxy implements Sto
         return rows;
     }
 
-    static class LocalReadRunnable extends WrappedRunnable
+    static class LocalReadRunnable extends DroppableRunnable
     {
         private final ReadCommand command;
         private final ReadCallback<Row> handler;
@@ -624,6 +624,7 @@ public class StorageProxy implements Sto
 
         LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
         {
+            super(StorageService.Verb.READ);
             this.command = command;
             this.handler = handler;
         }
@@ -1078,4 +1079,35 @@ public class StorageProxy implements Sto
     {
         public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException;
     }
+
+    private static abstract class DroppableRunnable implements Runnable
+    {
+        private final long constructionTime = System.currentTimeMillis();
+        private final StorageService.Verb verb;
+
+        public DroppableRunnable(StorageService.Verb verb)
+        {
+            this.verb = verb;
+        }
+
+        public final void run()
+        {
+            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+            {
+                MessagingService.instance().incrementDroppedMessages(verb);
+                return;
+            }
+
+            try
+            {
+                runMayThrow();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        abstract protected void runMayThrow() throws Exception;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Jul 29 05:09:59 2011
@@ -335,6 +335,11 @@ public class StorageService implements I
 
     public synchronized void initClient() throws IOException, ConfigurationException
     {
+        initClient(RING_DELAY);
+    }
+
+    public synchronized void initClient(int delay) throws IOException, ConfigurationException
+    {
         if (initialized)
         {
             if (!isClientMode)
@@ -352,7 +357,7 @@ public class StorageService implements I
         // sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply).
         try
         {
-            Thread.sleep(RING_DELAY);
+            Thread.sleep(delay);
         }
         catch (Exception ex)
         {
@@ -622,29 +627,35 @@ public class StorageService implements I
     }
 
     /*
-     * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here.
-     * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode.
-     * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal.
-     * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode
-     * should instead be part of the token ring.
+     * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
+     * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
+     * from somewhere else.
+     *
+     * onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were
+     * received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing
+     * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or
+     * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have
+     * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring.
      * 
-     * Normal MOVE_STATE progression of a node should be like this:
-     * STATE_BOOTSTRAPPING,token
+     * Normal progression of ApplicationState.STATUS values for a node should be like this:
+     * STATUS_BOOTSTRAPPING,token
      *   if bootstrapping. stays this way until all files are received.
-     * STATE_NORMAL,token 
+     * STATUS_NORMAL,token
      *   ready to serve reads and writes.
-     * STATE_NORMAL,token,REMOVE_TOKEN,token
-     *   specialized normal state in which this node acts as a proxy to tell the cluster about a dead node whose
-     *   token is being removed. this value becomes the permanent state of this node (unless it coordinates another
-     *   removetoken in the future).
-     * STATE_LEAVING,token 
-     *   get ready to leave the cluster as part of a decommission or move
-     * STATE_LEFT,token 
-     *   set after decommission or move is completed.
-     * STATE_MOVE,token
-     *   set if node if currently moving to a new token in the ring
-     * 
-     * Note: Any time a node state changes from STATE_NORMAL, it will not be visible to new nodes. So it follows that
+     * STATUS_LEAVING,token
+     *   get ready to leave the cluster as part of a decommission
+     * STATUS_LEFT,token
+     *   set after decommission is completed.
+     *
+     * Other STATUS values that may be seen (possibly anywhere in the normal progression):
+     * STATUS_MOVING,newtoken
+     *   set if node is currently moving to a new token in the ring
+     * REMOVING_TOKEN,deadtoken
+     *   set if the node is dead and is being removed by its REMOVAL_COORDINATOR
+     * REMOVED_TOKEN,deadtoken
+     *   set if the node is dead and has been removed by its REMOVAL_COORDINATOR
+     *
+     * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
      * you should never bootstrap a new node during a removetoken, decommission or move.
      */
     public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
@@ -665,6 +676,8 @@ public class StorageService implements I
                     handleStateBootstrap(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_NORMAL))
                     handleStateNormal(endpoint, pieces);
+                else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
+                    handleStateRemoving(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_LEAVING))
                     handleStateLeaving(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_LEFT))
@@ -731,7 +744,7 @@ public class StorageService implements I
      * in reads.
      *
      * @param endpoint node
-     * @param pieces STATE_NORMAL,token[,other_state,token]
+     * @param pieces STATE_NORMAL,token
      */
     private void handleStateNormal(InetAddress endpoint, String[] pieces)
     {
@@ -773,12 +786,6 @@ public class StorageService implements I
                                        endpoint, currentOwner, token, endpoint));
         }
 
-        if (pieces.length > 2)
-        {
-            assert pieces.length == 4;
-            handleStateRemoving(endpoint, getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]);
-        }
-
         if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token
             tokenMetadata_.removeFromMoving(endpoint);
 
@@ -860,37 +867,50 @@ public class StorageService implements I
      * Handle notification that a node being actively removed from the ring via 'removetoken'
      *
      * @param endpoint node
-     * @param state either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
+     * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
      */
-    private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state)
+    private void handleStateRemoving(InetAddress endpoint, String[] pieces)
     {
-        InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken);
-        
-        if (removeEndpoint == null)
-            return;
-        
-        if (removeEndpoint.equals(FBUtilities.getBroadcastAddress()))
-        {
-            logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
-            return;
-        }
+        String state = pieces[0];
+        assert (pieces.length > 0);
 
-        if (VersionedValue.REMOVED_TOKEN.equals(state))
+        if (endpoint.equals(FBUtilities.getBroadcastAddress()))
         {
-            excise(removeToken, removeEndpoint);
+            logger_.info("Received removeToken gossip about myself. Is this node rejoining after an explicit removetoken?");
+            try
+            {
+                drain();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            return;
         }
-        else if (VersionedValue.REMOVING_TOKEN.equals(state))
+        if (tokenMetadata_.isMember(endpoint))
         {
-            if (logger_.isDebugEnabled())
-                logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")");
+            Token removeToken = tokenMetadata_.getToken(endpoint);
 
-            // Note that the endpoint is being removed
-            tokenMetadata_.addLeavingEndpoint(removeEndpoint);
-            calculatePendingRanges();
+            if (VersionedValue.REMOVED_TOKEN.equals(state))
+            {
+                excise(removeToken, endpoint);
+            }
+            else if (VersionedValue.REMOVING_TOKEN.equals(state))
+            {
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Token " + removeToken + " removed manually (endpoint was " + endpoint + ")");
 
-            // grab any data we are now responsible for and notify responsible node
-            restoreReplicaCount(removeEndpoint, endpoint);
-        }
+                // Note that the endpoint is being removed
+                tokenMetadata_.addLeavingEndpoint(endpoint);
+                calculatePendingRanges();
+
+                // find the endpoint coordinating this removal that we need to notify when we're done
+                String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+                Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]);
+                // grab any data we are now responsible for and notify responsible node
+                restoreReplicaCount(endpoint, tokenMetadata_.getEndpoint(coordtoken));
+            }
+        } // not a member, nothing to do
     }
 
     private void excise(Token token, InetAddress endpoint)
@@ -1059,6 +1079,8 @@ public class StorageService implements I
         // notify the remote token
         Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
         IFailureDetector failureDetector = FailureDetector.instance;
+        if (logger_.isDebugEnabled())
+            logger_.debug("Notifying " + remote.toString() + " of replication completion\n");
         while (failureDetector.isAlive(remote))
         {
             IAsyncResult iar = MessagingService.instance().sendRR(msg, remote);
@@ -1993,9 +2015,14 @@ public class StorageService implements I
      */
     public void forceRemoveCompletion()
     {
-        if (!replicatingNodes.isEmpty())
+        if (!replicatingNodes.isEmpty()  || !tokenMetadata_.getLeavingEndpoints().isEmpty())
         {
             logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ","));
+            for (InetAddress endpoint : tokenMetadata_.getLeavingEndpoints())
+            {
+                Gossiper.instance.advertiseTokenRemoved(endpoint, tokenMetadata_.getToken(endpoint));
+                tokenMetadata_.removeEndpoint(endpoint);
+            }
             replicatingNodes.clear();
         }
         else
@@ -2059,9 +2086,9 @@ public class StorageService implements I
 
         tokenMetadata_.addLeavingEndpoint(endpoint);
         calculatePendingRanges();
-        // bundle two states together. include this nodes state to keep the status quo, 
-        // but indicate the leaving token so that it can be dealt with.
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token));
+        // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
+        // we add our own token so other nodes to let us know when they're done
+        Gossiper.instance.advertiseRemoving(endpoint, token, localToken);
 
         // kick off streaming commands
         restoreReplicaCount(endpoint, myAddress);
@@ -2081,8 +2108,8 @@ public class StorageService implements I
 
         excise(token, endpoint);
 
-        // indicate the token has left
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token));
+        // gossiper will indicate the token has left
+        Gossiper.instance.advertiseTokenRemoved(endpoint, token);
 
         replicatingNodes.clear();
         removingNode = null;
@@ -2090,8 +2117,18 @@ public class StorageService implements I
 
     public void confirmReplication(InetAddress node)
     {
-        assert !replicatingNodes.isEmpty();
-        replicatingNodes.remove(node);
+        // replicatingNodes can be empty in the case where this node used to be a removal coordinator,
+        // but restarted before all 'replication finished' messages arrived. In that case, we'll
+        // still go ahead and acknowledge it.
+        if (!replicatingNodes.isEmpty())
+        {
+            replicatingNodes.remove(node);
+        }
+        else
+        {
+            logger_.info("Received unexpected REPLICATION_FINISHED message from " + node
+                         + ". Was this node recently a removal coordinator?");
+        }
     }
 
     public boolean isClientMode()

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java Fri Jul 29 05:09:59 2011
@@ -30,6 +30,6 @@ public class InitClientTest // extends C
     @Test
     public void testInitClientStartup() throws IOException, ConfigurationException
     {
-        StorageService.instance.initClient();
+        StorageService.instance.initClient(0);
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java Fri Jul 29 05:09:59 2011
@@ -36,7 +36,7 @@ public class StorageServiceClientTest
     {
         CleanupHelper.mkdirs();
         CleanupHelper.cleanup();
-        StorageService.instance.initClient();
+        StorageService.instance.initClient(0);
 
         // verify that no storage directories were created.
         for (String path : DatabaseDescriptor.getAllDataFileLocations())