You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/29 07:10:01 UTC
svn commit: r1152107 - in /cassandra/trunk: ./ contrib/
contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ doc/cql/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/gms/
src/java/...
Author: jbellis
Date: Fri Jul 29 05:09:59 2011
New Revision: 1152107
URL: http://svn.apache.org/viewvc?rev=1152107&view=rev
Log:
merge from 0.8
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/NEWS.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
cassandra/trunk/doc/cql/CQL.textile
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java
cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7:1026516-1151306
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jul 29 05:09:59 2011
@@ -26,6 +26,19 @@
(CASSANDRA-1951)
+0.8.3
+ * add ability to drop local reads/writes that are going to timeout
+ (CASSANDRA-2943)
+ * revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946)
+ * don't accept extra args for 0-arg nodetool commands (CASSANDRA-2740)
+ * log unavailableexception details at debug level (CASSANDRA-2856)
+ * expose data_dir though jmx (CASSANDRA-2770)
+ * don't include tmp files as sstable when create cfs (CASSANDRA-2929)
+ * log Java classpath on startup (CASSANDRA-2895)
+ * keep gossipped version in sync with actual on migration coordinator
+ (CASSANDRA-2946)
+
+
0.8.2
* CQL:
- include only one row per unique key for IN queries (CASSANDRA-2717)
Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Jul 29 05:09:59 2011
@@ -7,6 +7,16 @@ Upgrading
sstableloader tool instead.
+0.8.3
+=====
+
+Upgrading
+---------
+ - Token removal has been revamped. Removing tokens in a mixed cluster with
+ 0.8.3 will not work, so the entire cluster will need to be running 0.8.3
+ first, except for the dead node.
+
+
0.8.2
=====
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/contrib:1026516-1151306
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Fri Jul 29 05:09:59 2011
@@ -69,8 +69,6 @@ public class CassandraStorage extends Lo
public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
- private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
-
private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
private static final Log logger = LogFactory.getLog(CassandraStorage.class);
@@ -79,6 +77,8 @@ public class CassandraStorage extends Lo
private boolean slice_reverse = false;
private String keyspace;
private String column_family;
+ private String loadSignature;
+ private String storeSignature;
private Configuration conf;
private RecordReader reader;
@@ -113,7 +113,7 @@ public class CassandraStorage extends Lo
if (!reader.nextKeyValue())
return null;
- CfDef cfDef = getCfDef();
+ CfDef cfDef = getCfDef(loadSignature);
ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
assert key != null && cf != null;
@@ -166,11 +166,11 @@ public class CassandraStorage extends Lo
return pair;
}
- private CfDef getCfDef()
+ private CfDef getCfDef(String signature)
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
- return cfdefFromString(property.getProperty(getSchemaContextKey()));
+ return cfdefFromString(property.getProperty(signature));
}
private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
@@ -290,7 +290,7 @@ public class CassandraStorage extends Lo
}
ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
- initSchema();
+ initSchema(loadSignature);
}
@Override
@@ -299,9 +299,16 @@ public class CassandraStorage extends Lo
return location;
}
+ @Override
+ public void setUDFContextSignature(String signature)
+ {
+ this.loadSignature = signature;
+ }
+
/* StoreFunc methods */
public void setStoreFuncUDFContextSignature(String signature)
{
+ this.storeSignature = signature;
}
public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
@@ -315,7 +322,7 @@ public class CassandraStorage extends Lo
setLocationFromUri(location);
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
setConnectionInformation();
- initSchema();
+ initSchema(storeSignature);
}
public OutputFormat getOutputFormat()
@@ -347,7 +354,7 @@ public class CassandraStorage extends Lo
ByteBuffer key = objToBB(t.get(0));
DefaultDataBag pairs = (DefaultDataBag) t.get(1);
ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
- CfDef cfDef = getCfDef();
+ CfDef cfDef = getCfDef(storeSignature);
List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
try
@@ -412,7 +419,7 @@ public class CassandraStorage extends Lo
}
catch (ClassCastException e)
{
- throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+ throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
}
try
{
@@ -430,14 +437,13 @@ public class CassandraStorage extends Lo
/* Methods to get the column family schema from Cassandra */
- private void initSchema()
+ private void initSchema(String signature)
{
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(CassandraStorage.class);
- String schemaContextKey = getSchemaContextKey();
// Only get the schema if we haven't already gotten it
- if (!property.containsKey(schemaContextKey))
+ if (!property.containsKey(signature))
{
Cassandra.Client client = null;
try
@@ -455,7 +461,7 @@ public class CassandraStorage extends Lo
break;
}
}
- property.setProperty(schemaContextKey, cfdefToString(cfDef));
+ property.setProperty(signature, cfdefToString(cfDef));
}
catch (TException e)
{
@@ -521,14 +527,4 @@ public class CassandraStorage extends Lo
}
return cfDef;
}
-
- private String getSchemaContextKey()
- {
- StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
- sb.append('.');
- sb.append(keyspace);
- sb.append('.');
- sb.append(column_family);
- return sb.toString();
- }
}
Modified: cassandra/trunk/doc/cql/CQL.textile
URL: http://svn.apache.org/viewvc/cassandra/trunk/doc/cql/CQL.textile?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/doc/cql/CQL.textile (original)
+++ cassandra/trunk/doc/cql/CQL.textile Fri Jul 29 05:09:59 2011
@@ -1,4 +1,4 @@
-h1. Cassandra Query Language (CQL) v1.0.0
+h1. Cassandra Query Language (CQL) v1.1.0
h2. Table of Contents
@@ -364,5 +364,8 @@ Versioning of the CQL language adheres t
h1. Changes
pre.
+Sat, 01 Jun 2011 15:58:00 -0600 - Pavel Yaskevich
+ * Updated to support ALTER (CASSANDRA-1709)
+
Tue, 22 Mar 2011 18:10:28 -0700 - Eric Evans <ee...@rackspace.com>
* Initial version, 1.0.0
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1151306
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1151306
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1151306
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1151306
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jul 29 05:09:59 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1149015,1149716
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1151306
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1149725,1150103,1151495,1151497
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1151495,1151497
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jul 29 05:09:59 2011
@@ -204,6 +204,9 @@ public class HintedHandOffManager implem
}
waited = 0;
// then wait for the correct schema version.
+ // usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system table.
+ // here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that
+ // causes the two to diverge (see CASSANDRA-2946)
while (!gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Fri Jul 29 05:09:59 2011
@@ -29,6 +29,7 @@ public enum ApplicationState
DC,
RACK,
RELEASE_VERSION,
+ REMOVAL_COORDINATOR,
INTERNAL_IP,
// pad to allow adding new states to existing cluster
X1,
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Jul 29 05:09:59 2011
@@ -27,6 +27,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.config.ConfigurationException;
@@ -58,6 +59,8 @@ public class Gossiper implements IFailur
private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks");
static final ApplicationState[] STATES = ApplicationState.values();
+ static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT);
+
private ScheduledFuture<?> scheduledGossipTask;
public final static int intervalInMillis = 1000;
public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
@@ -264,17 +267,21 @@ public class Gossiper implements IFailur
}
/**
- * Removes the endpoint from unreachable endpoint set
+ * Removes the endpoint from gossip completely
*
* @param endpoint endpoint to be removed from the current membership.
*/
private void evictFromMembership(InetAddress endpoint)
{
unreachableEndpoints.remove(endpoint);
+ endpointStateMap.remove(endpoint);
+ justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+ if (logger.isDebugEnabled())
+ logger.debug("evicting " + endpoint + " from gossip");
}
/**
- * Removes the endpoint completely from Gossip
+ * Removes the endpoint from Gossip but retains endpoint state
*/
public void removeEndpoint(InetAddress endpoint)
{
@@ -288,6 +295,8 @@ public class Gossiper implements IFailur
FailureDetector.instance.remove(endpoint);
versions.remove(endpoint);
justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+ if (logger.isDebugEnabled())
+ logger.debug("removing endpoint " + endpoint);
}
/**
@@ -328,6 +337,67 @@ public class Gossiper implements IFailur
}
}
+ /**
+ * This method will begin removing an existing endpoint from the cluster by spoofing its state
+ * This should never be called unless this coordinator has had 'removetoken' invoked
+ *
+ * @param endpoint - the endpoint being removed
+ * @param token - the token being removed
+ * @param mytoken - my own token for replication coordination
+ */
+ public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken)
+ {
+ EndpointState epState = endpointStateMap.get(endpoint);
+ // remember this node's generation
+ int generation = epState.getHeartBeatState().getGeneration();
+ logger.info("Removing token: " + token);
+ logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change");
+ try
+ {
+ Thread.sleep(StorageService.RING_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ // make sure it did not change
+ epState = endpointStateMap.get(endpoint);
+ if (epState.getHeartBeatState().getGeneration() != generation)
+ throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
+ // update the other node's generation to mimic it as if it had changed it itself
+ logger.info("Advertising removal for " + endpoint);
+ epState.updateTimestamp(); // make sure we don't evict it too soon
+ epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token));
+ epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken));
+ endpointStateMap.put(endpoint, epState);
+ }
+
+ /**
+ * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
+ * This should only be called after advertiseRemoving
+ * @param endpoint
+ * @param token
+ */
+ public void advertiseTokenRemoved(InetAddress endpoint, Token token)
+ {
+ EndpointState epState = endpointStateMap.get(endpoint);
+ epState.updateTimestamp(); // make sure we don't evict it too soon
+ epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token));
+ logger.info("Completing removal of " + endpoint);
+ endpointStateMap.put(endpoint, epState);
+ // ensure at least one gossip round occurs before returning
+ try
+ {
+ Thread.sleep(intervalInMillis * 2);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
public boolean isKnownEndpoint(InetAddress endpoint)
{
return endpointStateMap.containsKey(endpoint);
@@ -456,23 +526,18 @@ public class Gossiper implements IFailur
{
long duration = now - epState.getUpdateTimestamp();
+ if (StorageService.instance.getTokenMetadata().isMember(endpoint))
+ epState.setHasToken(true);
// check if this is a fat client. fat clients are removed automatically from
// gosip after FatClientTimeout
- if (!epState.hasToken() && !epState.isAlive() && (duration > FatClientTimeout))
+ if (!epState.hasToken() && !epState.isAlive() && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout))
{
- if (StorageService.instance.getTokenMetadata().isMember(endpoint))
- epState.setHasToken(true);
- else
- {
- if (!justRemovedEndpoints.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client
- {
- logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
- removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state
- }
- }
+ logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
+ removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
+ evictFromMembership(endpoint); // can get rid of the state immediately
}
- if ( !epState.isAlive() && (duration > aVeryLongTime) )
+ if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
{
evictFromMembership(endpoint);
}
@@ -488,7 +553,6 @@ public class Gossiper implements IFailur
if (logger.isDebugEnabled())
logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
justRemovedEndpoints.remove(entry.getKey());
- endpointStateMap.remove(entry.getKey());
}
}
}
@@ -585,6 +649,7 @@ public class Gossiper implements IFailur
int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration();
if ( remoteGeneration > localGeneration )
{
+ localEndpointState.updateTimestamp();
fd.report(endpoint);
return;
}
@@ -595,6 +660,7 @@ public class Gossiper implements IFailur
int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion();
if ( remoteVersion > localVersion )
{
+ localEndpointState.updateTimestamp();
fd.report(endpoint);
}
}
@@ -607,6 +673,7 @@ public class Gossiper implements IFailur
if (logger.isTraceEnabled())
logger.trace("marking as alive {}", addr);
localState.markAlive();
+ localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
liveEndpoints.add(addr);
unreachableEndpoints.remove(addr);
logger.info("InetAddress {} is now UP", addr);
@@ -638,10 +705,13 @@ public class Gossiper implements IFailur
*/
private void handleMajorStateChange(InetAddress ep, EndpointState epState)
{
- if (endpointStateMap.get(ep) != null)
- logger.info("Node {} has restarted, now UP again", ep);
- else
- logger.info("Node {} is now part of the cluster", ep);
+ if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+ {
+ if (endpointStateMap.get(ep) != null)
+ logger.info("Node {} has restarted, now UP again", ep);
+ else
+ logger.info("Node {} is now part of the cluster", ep);
+ }
if (logger.isTraceEnabled())
logger.trace("Adding endpoint state for " + ep);
endpointStateMap.put(ep, epState);
@@ -651,11 +721,31 @@ public class Gossiper implements IFailur
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onDead(ep, epState);
}
- markAlive(ep, epState);
+ if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+ markAlive(ep, epState);
+ else
+ {
+ logger.debug("Not marking " + ep + " alive due to dead state");
+ epState.markDead();
+ epState.setHasToken(true); // fat clients won't have a dead state
+ }
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onJoin(ep, epState);
}
+ private Boolean isDeadState(String value)
+ {
+ String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+ assert (pieces.length > 0);
+ String state = pieces[0];
+ for (String deadstate : DEAD_STATES)
+ {
+ if (state.equals(deadstate))
+ return true;
+ }
+ return false;
+ }
+
void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
{
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/HeartBeatState.java Fri Jul 29 05:09:59 2011
@@ -71,6 +71,11 @@ class HeartBeatState
{
return version;
}
+
+ void forceNewerGenerationUnsafe()
+ {
+ generation += 1;
+ }
}
class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Fri Jul 29 05:09:59 2011
@@ -49,7 +49,7 @@ public class VersionedValue implements C
public final static char DELIMITER = ',';
public final static String DELIMITER_STR = new String(new char[] { DELIMITER });
- // values for State.STATUS
+ // values for ApplicationState.STATUS
public final static String STATUS_BOOTSTRAPPING = "BOOT";
public final static String STATUS_NORMAL = "NORMAL";
public final static String STATUS_LEAVING = "LEAVING";
@@ -59,6 +59,9 @@ public class VersionedValue implements C
public final static String REMOVING_TOKEN = "removing";
public final static String REMOVED_TOKEN = "removed";
+ // values for ApplicationState.REMOVAL_COORDINATOR
+ public final static String REMOVAL_COORDINATOR = "REMOVER";
+
public final int version;
public final String value;
@@ -129,20 +132,19 @@ public class VersionedValue implements C
return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
- public VersionedValue removingNonlocal(Token localToken, Token token)
+ public VersionedValue removingNonlocal(Token token)
+ {
+ return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ }
+
+ public VersionedValue removedNonlocal(Token token)
{
- return new VersionedValue(VersionedValue.STATUS_NORMAL
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
- + VersionedValue.DELIMITER + VersionedValue.REMOVING_TOKEN
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
- public VersionedValue removedNonlocal(Token localToken, Token token)
+ public VersionedValue removalCoordinator(Token token)
{
- return new VersionedValue(VersionedValue.STATUS_NORMAL
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
- + VersionedValue.DELIMITER + VersionedValue.REMOVED_TOKEN
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
public VersionedValue datacenter(String dcId)
Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jul 29 05:09:59 2011
@@ -472,6 +472,10 @@ public final class MessagingService impl
public void receive(Message message, String id)
{
+ if (logger_.isTraceEnabled())
+ logger_.trace(FBUtilities.getLocalAddress() + " received " + message.getVerb()
+ + " from " + id + "@" + message.getFrom());
+
message = SinkManager.processServerMessage(message, id);
if (message == null)
return;
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Jul 29 05:09:59 2011
@@ -119,6 +119,7 @@ public abstract class AbstractCassandraD
{
logger.info("JVM vendor/version: {}/{}", System.getProperty("java.vm.name"), System.getProperty("java.version") );
logger.info("Heap size: {}/{}", Runtime.getRuntime().totalMemory(), Runtime.getRuntime().maxMemory());
+ logger.info("Classpath: {}", System.getProperty("java.class.path"));
CLibrary.tryMlockall();
listenPort = DatabaseDescriptor.getRpcPort();
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jul 29 05:09:59 2011
@@ -349,7 +349,7 @@ public class StorageProxy implements Sto
{
if (logger.isDebugEnabled())
logger.debug("insert writing local " + rm.toString(true));
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
@@ -431,7 +431,7 @@ public class StorageProxy implements Sto
if (logger.isDebugEnabled())
logger.debug("insert writing local & replicate " + mutation.toString(true));
- Runnable runnable = new WrappedRunnable()
+ Runnable runnable = new DroppableRunnable(StorageService.Verb.MUTATION)
{
public void runMayThrow() throws IOException
{
@@ -447,7 +447,7 @@ public class StorageProxy implements Sto
{
// We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
// and we want to avoid blocking too much the MUTATION stage
- StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
+ StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ)
{
public void runMayThrow() throws IOException
{
@@ -616,7 +616,7 @@ public class StorageProxy implements Sto
return rows;
}
- static class LocalReadRunnable extends WrappedRunnable
+ static class LocalReadRunnable extends DroppableRunnable
{
private final ReadCommand command;
private final ReadCallback<Row> handler;
@@ -624,6 +624,7 @@ public class StorageProxy implements Sto
LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
{
+ super(StorageService.Verb.READ);
this.command = command;
this.handler = handler;
}
@@ -1078,4 +1079,35 @@ public class StorageProxy implements Sto
{
public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException;
}
+
+ private static abstract class DroppableRunnable implements Runnable
+ {
+ private final long constructionTime = System.currentTimeMillis();
+ private final StorageService.Verb verb;
+
+ public DroppableRunnable(StorageService.Verb verb)
+ {
+ this.verb = verb;
+ }
+
+ public final void run()
+ {
+ if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
+ {
+ MessagingService.instance().incrementDroppedMessages(verb);
+ return;
+ }
+
+ try
+ {
+ runMayThrow();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ abstract protected void runMayThrow() throws Exception;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Jul 29 05:09:59 2011
@@ -335,6 +335,11 @@ public class StorageService implements I
public synchronized void initClient() throws IOException, ConfigurationException
{
+ initClient(RING_DELAY);
+ }
+
+ public synchronized void initClient(int delay) throws IOException, ConfigurationException
+ {
if (initialized)
{
if (!isClientMode)
@@ -352,7 +357,7 @@ public class StorageService implements I
// sleep a while to allow gossip to warm up (the other nodes need to know about this one before they can reply).
try
{
- Thread.sleep(RING_DELAY);
+ Thread.sleep(delay);
}
catch (Exception ex)
{
@@ -622,29 +627,35 @@ public class StorageService implements I
}
/*
- * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here.
- * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode.
- * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal.
- * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode
- * should instead be part of the token ring.
+ * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
+ * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
+ * from somewhere else.
+ *
+ * onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were
+ * received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing
+ * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or
+ * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have
+ * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring.
*
- * Normal MOVE_STATE progression of a node should be like this:
- * STATE_BOOTSTRAPPING,token
+ * Normal progression of ApplicationState.STATUS values for a node should be like this:
+ * STATUS_BOOTSTRAPPING,token
* if bootstrapping. stays this way until all files are received.
- * STATE_NORMAL,token
+ * STATUS_NORMAL,token
* ready to serve reads and writes.
- * STATE_NORMAL,token,REMOVE_TOKEN,token
- * specialized normal state in which this node acts as a proxy to tell the cluster about a dead node whose
- * token is being removed. this value becomes the permanent state of this node (unless it coordinates another
- * removetoken in the future).
- * STATE_LEAVING,token
- * get ready to leave the cluster as part of a decommission or move
- * STATE_LEFT,token
- * set after decommission or move is completed.
- * STATE_MOVE,token
- * set if node if currently moving to a new token in the ring
- *
- * Note: Any time a node state changes from STATE_NORMAL, it will not be visible to new nodes. So it follows that
+ * STATUS_LEAVING,token
+ * get ready to leave the cluster as part of a decommission
+ * STATUS_LEFT,token
+ * set after decommission is completed.
+ *
+ * Other STATUS values that may be seen (possibly anywhere in the normal progression):
+ * STATUS_MOVING,newtoken
+ * set if node is currently moving to a new token in the ring
+ * REMOVING_TOKEN,deadtoken
+ * set if the node is dead and is being removed by its REMOVAL_COORDINATOR
+ * REMOVED_TOKEN,deadtoken
+ * set if the node is dead and has been removed by its REMOVAL_COORDINATOR
+ *
+ * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
* you should never bootstrap a new node during a removetoken, decommission or move.
*/
public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
@@ -665,6 +676,8 @@ public class StorageService implements I
handleStateBootstrap(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_NORMAL))
handleStateNormal(endpoint, pieces);
+ else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
+ handleStateRemoving(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_LEAVING))
handleStateLeaving(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_LEFT))
@@ -731,7 +744,7 @@ public class StorageService implements I
* in reads.
*
* @param endpoint node
- * @param pieces STATE_NORMAL,token[,other_state,token]
+ * @param pieces STATE_NORMAL,token
*/
private void handleStateNormal(InetAddress endpoint, String[] pieces)
{
@@ -773,12 +786,6 @@ public class StorageService implements I
endpoint, currentOwner, token, endpoint));
}
- if (pieces.length > 2)
- {
- assert pieces.length == 4;
- handleStateRemoving(endpoint, getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]);
- }
-
if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token
tokenMetadata_.removeFromMoving(endpoint);
@@ -860,37 +867,50 @@ public class StorageService implements I
* Handle notification that a node being actively removed from the ring via 'removetoken'
*
* @param endpoint node
- * @param state either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
+ * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
*/
- private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state)
+ private void handleStateRemoving(InetAddress endpoint, String[] pieces)
{
- InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken);
-
- if (removeEndpoint == null)
- return;
-
- if (removeEndpoint.equals(FBUtilities.getBroadcastAddress()))
- {
- logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
- return;
- }
+ String state = pieces[0];
+ assert (pieces.length > 0);
- if (VersionedValue.REMOVED_TOKEN.equals(state))
+ if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
- excise(removeToken, removeEndpoint);
+ logger_.info("Received removeToken gossip about myself. Is this node rejoining after an explicit removetoken?");
+ try
+ {
+ drain();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return;
}
- else if (VersionedValue.REMOVING_TOKEN.equals(state))
+ if (tokenMetadata_.isMember(endpoint))
{
- if (logger_.isDebugEnabled())
- logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")");
+ Token removeToken = tokenMetadata_.getToken(endpoint);
- // Note that the endpoint is being removed
- tokenMetadata_.addLeavingEndpoint(removeEndpoint);
- calculatePendingRanges();
+ if (VersionedValue.REMOVED_TOKEN.equals(state))
+ {
+ excise(removeToken, endpoint);
+ }
+ else if (VersionedValue.REMOVING_TOKEN.equals(state))
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Token " + removeToken + " removed manually (endpoint was " + endpoint + ")");
- // grab any data we are now responsible for and notify responsible node
- restoreReplicaCount(removeEndpoint, endpoint);
- }
+ // Note that the endpoint is being removed
+ tokenMetadata_.addLeavingEndpoint(endpoint);
+ calculatePendingRanges();
+
+ // find the endpoint coordinating this removal that we need to notify when we're done
+ String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+ Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]);
+ // grab any data we are now responsible for and notify responsible node
+ restoreReplicaCount(endpoint, tokenMetadata_.getEndpoint(coordtoken));
+ }
+ } // not a member, nothing to do
}
private void excise(Token token, InetAddress endpoint)
@@ -1059,6 +1079,8 @@ public class StorageService implements I
// notify the remote token
Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
IFailureDetector failureDetector = FailureDetector.instance;
+ if (logger_.isDebugEnabled())
+ logger_.debug("Notifying " + remote.toString() + " of replication completion\n");
while (failureDetector.isAlive(remote))
{
IAsyncResult iar = MessagingService.instance().sendRR(msg, remote);
@@ -1993,9 +2015,14 @@ public class StorageService implements I
*/
public void forceRemoveCompletion()
{
- if (!replicatingNodes.isEmpty())
+ if (!replicatingNodes.isEmpty() || !tokenMetadata_.getLeavingEndpoints().isEmpty())
{
logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ","));
+ for (InetAddress endpoint : tokenMetadata_.getLeavingEndpoints())
+ {
+ Gossiper.instance.advertiseTokenRemoved(endpoint, tokenMetadata_.getToken(endpoint));
+ tokenMetadata_.removeEndpoint(endpoint);
+ }
replicatingNodes.clear();
}
else
@@ -2059,9 +2086,9 @@ public class StorageService implements I
tokenMetadata_.addLeavingEndpoint(endpoint);
calculatePendingRanges();
- // bundle two states together. include this nodes state to keep the status quo,
- // but indicate the leaving token so that it can be dealt with.
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token));
+ // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
+ // we add our own token so other nodes to let us know when they're done
+ Gossiper.instance.advertiseRemoving(endpoint, token, localToken);
// kick off streaming commands
restoreReplicaCount(endpoint, myAddress);
@@ -2081,8 +2108,8 @@ public class StorageService implements I
excise(token, endpoint);
- // indicate the token has left
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token));
+ // gossiper will indicate the token has left
+ Gossiper.instance.advertiseTokenRemoved(endpoint, token);
replicatingNodes.clear();
removingNode = null;
@@ -2090,8 +2117,18 @@ public class StorageService implements I
public void confirmReplication(InetAddress node)
{
- assert !replicatingNodes.isEmpty();
- replicatingNodes.remove(node);
+ // replicatingNodes can be empty in the case where this node used to be a removal coordinator,
+ // but restarted before all 'replication finished' messages arrived. In that case, we'll
+ // still go ahead and acknowledge it.
+ if (!replicatingNodes.isEmpty())
+ {
+ replicatingNodes.remove(node);
+ }
+ else
+ {
+ logger_.info("Received unexpected REPLICATION_FINISHED message from " + node
+ + ". Was this node recently a removal coordinator?");
+ }
}
public boolean isClientMode()
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/InitClientTest.java Fri Jul 29 05:09:59 2011
@@ -30,6 +30,6 @@ public class InitClientTest // extends C
@Test
public void testInitClientStartup() throws IOException, ConfigurationException
{
- StorageService.instance.initClient();
+ StorageService.instance.initClient(0);
}
}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java?rev=1152107&r1=1152106&r2=1152107&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/StorageServiceClientTest.java Fri Jul 29 05:09:59 2011
@@ -36,7 +36,7 @@ public class StorageServiceClientTest
{
CleanupHelper.mkdirs();
CleanupHelper.cleanup();
- StorageService.instance.initClient();
+ StorageService.instance.initClient(0);
// verify that no storage directories were created.
for (String path : DatabaseDescriptor.getAllDataFileLocations())