You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/21 19:50:10 UTC
svn commit: r892926 - in /incubator/cassandra/trunk: ./
interface/gen-java/org/apache/cassandra/service/ src/java/org/
src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/service/ test/unit/org/
test/unit/org/apache/cassandra/dht/ test...
Author: jbellis
Date: Mon Dec 21 18:50:09 2009
New Revision: 892926
URL: http://svn.apache.org/viewvc?rev=892926&view=rev
Log:
merge from 0.5
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
- copied unchanged from r892891, incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/service/MoveTest.java
Modified:
incubator/cassandra/trunk/ (props changed)
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java (props changed)
incubator/cassandra/trunk/src/java/org/ (props changed)
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
incubator/cassandra/trunk/test/unit/org/ (props changed)
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5:888872-892352
+/incubator/cassandra/branches/cassandra-0.5:888872-892891
Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Mon Dec 21 18:50:09 2009
@@ -11,8 +11,12 @@
* Fix anti-entropy assertion error (CASSANDRA-639)
* Fix pending range conflicts when bootstapping or moving
multiple nodes at once (CASSANDRA-603)
+ * Handle obsolete gossip related to node movement in the case where
+ one or more nodes is down when the movement occurs (CASSANDRA-572)
* Include dead nodes in gossip to avoid a variety of problems
(CASSANDRA-634)
+ * return an InvalidRequestException for mal-formed SlicePredicates
+ (CASSANDRA-643)
0.5.0 beta 2
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-892891
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,5 +1,5 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-892891
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-892891
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-892891
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,5 +1,5 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-892891
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588
Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-892891
/incubator/cassandra/trunk/src/java/org:749219-769885
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Mon Dec 21 18:50:09 2009
@@ -133,6 +133,10 @@
private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, TokenMetadata metadata) throws IOException
{
ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+ if (metadata.sortedTokens().size() == 0)
+ return endpoints;
+
if (null == tokens || tokens.size() != metadata.sortedTokens().size())
{
loadEndPoints(metadata);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon Dec 21 18:50:09 2009
@@ -50,6 +50,10 @@
boolean bOtherRack = false;
int foundCount = 0;
List tokens = metadata.sortedTokens();
+
+ if (tokens.isEmpty())
+ return endpoints;
+
int index = Collections.binarySearch(tokens, token);
if(index < 0)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Dec 21 18:50:09 2009
@@ -45,6 +45,10 @@
int startIndex;
List<Token> tokenList = new ArrayList<Token>();
List tokens = new ArrayList<Token>(metadata.sortedTokens());
+ ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(tokenList.size());
+
+ if (tokens.isEmpty())
+ return endpoints;
int index = Collections.binarySearch(tokens, token);
if (index < 0)
@@ -64,7 +68,6 @@
assert !tokenList.contains(tokens.get(i));
tokenList.add((Token) tokens.get(i));
}
- ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(tokenList.size());
for (Token t : tokenList)
endpoints.add(metadata.getEndPoint(t));
return endpoints;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Mon Dec 21 18:50:09 2009
@@ -47,7 +47,7 @@
// for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a multimap,
// since that would make us unable to notice the real problem of two nodes trying to boot using the same token.
// In order to do this properly, we need to know what tokens are booting at any time.
- private Map<Token, InetAddress> bootstrapTokens;
+ private BiMap<Token, InetAddress> bootstrapTokens;
// we will need to know at all times what nodes are leaving and calculate ranges accordingly.
// An anonymous pending ranges list is not enough, as that does not tell which node is leaving
@@ -71,7 +71,7 @@
if (tokenToEndPointMap == null)
tokenToEndPointMap = HashBiMap.create();
this.tokenToEndPointMap = tokenToEndPointMap;
- bootstrapTokens = new HashMap<Token, InetAddress>();
+ bootstrapTokens = HashBiMap.create();
leavingEndPoints = new HashSet<InetAddress>();
pendingRanges = HashMultimap.create();
sortedTokens = sortTokens();
@@ -103,13 +103,13 @@
lock.writeLock().lock();
try
{
- bootstrapTokens.remove(token);
-
+ bootstrapTokens.inverse().remove(endpoint);
tokenToEndPointMap.inverse().remove(endpoint);
if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
{
sortedTokens = sortTokens();
}
+ leavingEndPoints.remove(endpoint);
}
finally
{
@@ -125,9 +125,17 @@
lock.writeLock().lock();
try
{
- InetAddress oldEndPoint = bootstrapTokens.get(token);
+ InetAddress oldEndPoint = null;
+
+ oldEndPoint = bootstrapTokens.get(token);
+ if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
+ throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint + " and " + endpoint + " (token " + token);
+
+ oldEndPoint = tokenToEndPointMap.get(token);
if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint + " and " + endpoint + " (token " + token);
+
+ bootstrapTokens.inverse().remove(endpoint);
bootstrapTokens.put(token, endpoint);
}
finally
@@ -136,6 +144,21 @@
}
}
+ public void removeBootstrapToken(Token token)
+ {
+ assert token != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ bootstrapTokens.remove(token);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void addLeavingEndPoint(InetAddress endpoint)
{
assert endpoint != null;
@@ -151,13 +174,28 @@
}
}
+ public void removeLeavingEndPoint(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ leavingEndPoints.remove(endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
public void removeEndpoint(InetAddress endpoint)
{
assert tokenToEndPointMap.containsValue(endpoint);
lock.writeLock().lock();
try
{
- bootstrapTokens.remove(getToken(endpoint));
+ bootstrapTokens.inverse().remove(endpoint);
tokenToEndPointMap.inverse().remove(endpoint);
leavingEndPoints.remove(endpoint);
sortedTokens = sortTokens();
@@ -199,6 +237,21 @@
}
}
+ public boolean isLeaving(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.readLock().lock();
+ try
+ {
+ return leavingEndPoints.contains(endpoint);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
public InetAddress getFirstEndpoint()
{
assert tokenToEndPointMap.size() > 0;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon Dec 21 18:50:09 2009
@@ -206,9 +206,9 @@
throws InvalidRequestException, UnavailableException, TimedOutException
{
ThriftValidation.validateColumnParent(keyspace, column_parent);
- List<ReadCommand> commands = new ArrayList<ReadCommand>();
- SliceRange range = predicate.slice_range;
+ ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
if (predicate.column_names != null)
{
for (String key: keys)
@@ -216,16 +216,15 @@
ThriftValidation.validateKey(key);
commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
}
- ThriftValidation.validateColumns(keyspace, column_parent, predicate.column_names);
}
else
{
+ SliceRange range = predicate.slice_range;
for (String key: keys)
{
ThriftValidation.validateKey(key);
commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
}
- ThriftValidation.validateRange(keyspace, column_parent, range);
}
return getSlice(commands, consistency_level);
@@ -566,8 +565,7 @@
if (logger.isDebugEnabled())
logger.debug("range_slice");
- validatePredicate(keyspace, column_parent, predicate);
-
+ ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
if (!StorageService.getPartitioner().preservesOrder())
{
throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Dec 21 18:50:09 2009
@@ -60,15 +60,18 @@
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
- // these aren't in an enum since other gossip users can create states ad-hoc too (e.g. load broadcasting)
+ public final static String MOVE_STATE = "MOVE";
+
+ // this must be a char that cannot be present in any token
+ public final static char Delimiter = ',';
+
+ public final static String STATE_BOOTSTRAPPING = "BOOT";
public final static String STATE_NORMAL = "NORMAL";
- public final static String STATE_BOOTSTRAPPING = "BOOTSTRAPPING";
public final static String STATE_LEAVING = "LEAVING";
public final static String STATE_LEFT = "LEFT";
- private final static char StateDelimiter = ',';
- private final static String REMOVE_TOKEN = "remove";
- private final static String LEFT_NORMALLY = "left";
+ public final static String REMOVE_TOKEN = "remove";
+ public final static String LEFT_NORMALLY = "left";
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
@@ -173,7 +176,7 @@
isBootstrapMode = false;
SystemTable.setBootstrapped(true);
setToken(getLocalToken());
- Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));
logger_.info("Bootstrap/move completed! Now serving reads.");
}
@@ -307,7 +310,7 @@
SystemTable.setBootstrapped(true);
Token token = storageMetadata_.getToken();
tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
- Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token)));
}
assert tokenMetadata_.sortedTokens().size() > 0;
@@ -317,7 +320,7 @@
{
isBootstrapMode = true;
SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
- Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token)));
logger_.info("bootstrap sleeping " + Streaming.RING_DELAY);
try
{
@@ -400,67 +403,164 @@
* A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode
* should instead be part of the token ring.
*/
- public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
+ public void onChange(InetAddress endpoint, String apStateName, ApplicationState apState)
{
- if (STATE_BOOTSTRAPPING.equals(stateName))
- {
- Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " state bootstrapping, token " + token);
- tokenMetadata_.addBootstrapToken(token, endpoint);
- calculatePendingRanges();
+ if (!MOVE_STATE.equals(apStateName))
+ return;
+
+ String apStateValue = apState.getValue();
+ int index = apStateValue.indexOf(Delimiter);
+ assert (index != -1);
+
+ String moveName = apStateValue.substring(0, index);
+ String moveValue = apStateValue.substring(index+1);
+
+ if (moveName.equals(STATE_BOOTSTRAPPING))
+ handleStateBootstrap(endpoint, moveValue);
+ else if (moveName.equals(STATE_NORMAL))
+ handleStateNormal(endpoint, moveValue);
+ else if (moveName.equals(STATE_LEAVING))
+ handleStateLeaving(endpoint, moveValue);
+ else if (moveName.equals(STATE_LEFT))
+ handleStateLeft(endpoint, moveValue);
+ }
+
+ /**
+ * Handle node bootstrap
+ *
+ * @param endPoint bootstrapping node
+ * @param moveValue bootstrap token as string
+ */
+ private void handleStateBootstrap(InetAddress endPoint, String moveValue)
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state bootstrapping, token " + token);
+
+ // if this node is present in token metadata, either we have missed intermediate states
+ // or the node had crashed. Print warning if needed, clear obsolete stuff and
+ // continue.
+ if (tokenMetadata_.isMember(endPoint))
+ {
+ // If isLeaving is false, we have missed both LEAVING and LEFT. However, if
+ // isLeaving is true, we have only missed LEFT. Waiting time between completing
+ // leave operation and rebootstrapping is relatively short, so the latter is quite
+ // common (not enough time for gossip to spread). Therefore we report only the
+ // former in the log.
+ if (!tokenMetadata_.isLeaving(endPoint))
+ logger_.info("Node " + endPoint + " state jump to bootstrap");
+ tokenMetadata_.removeEndpoint(endPoint);
}
- else if (STATE_NORMAL.equals(stateName))
+
+ tokenMetadata_.addBootstrapToken(token, endPoint);
+ calculatePendingRanges();
+ }
+
+ /**
+ * Handle node move to normal state. That is, node is entering token ring and participating
+ * in reads.
+ *
+ * @param endPoint node
+ * @param moveValue token as string
+ */
+ private void handleStateNormal(InetAddress endPoint, String moveValue)
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state normal, token " + token);
+
+ if (tokenMetadata_.isMember(endPoint))
+ logger_.info("Node " + endPoint + " state jump to normal");
+
+ tokenMetadata_.updateNormalToken(token, endPoint);
+ calculatePendingRanges();
+ if (!isClientMode)
+ SystemTable.updateToken(endPoint, token);
+ }
+
+ /**
+ * Handle node preparing to leave the ring
+ *
+ * @param endPoint node
+ * @param moveValue token as string
+ */
+ private void handleStateLeaving(InetAddress endPoint, String moveValue)
+ {
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state leaving, token " + token);
+
+ // If the node is previously unknown or tokens do not match, update tokenmetadata to
+ // have this node as 'normal' (it must have been using this token before the
+ // leave). This way we'll get pending ranges right.
+ if (!tokenMetadata_.isMember(endPoint))
{
- Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " state normal, token " + token);
- tokenMetadata_.updateNormalToken(token, endpoint);
- calculatePendingRanges();
- if (!isClientMode)
- SystemTable.updateToken(endpoint, token);
+ logger_.info("Node " + endPoint + " state jump to leaving");
+ tokenMetadata_.updateNormalToken(token, endPoint);
}
- else if (STATE_LEAVING.equals(stateName))
+ else if (!tokenMetadata_.getToken(endPoint).equals(token))
{
- Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
- assert tokenMetadata_.getToken(endpoint).equals(token);
- tokenMetadata_.addLeavingEndPoint(endpoint);
- calculatePendingRanges();
+ logger_.warn("Node " + endPoint + " 'leaving' token mismatch. Long network partition?");
+ tokenMetadata_.updateNormalToken(token, endPoint);
}
- else if (STATE_LEFT.equals(stateName))
+
+ // at this point the endpoint is certainly a member with this token, so let's proceed
+ // normally
+ tokenMetadata_.addLeavingEndPoint(endPoint);
+ calculatePendingRanges();
+ }
+
+ /**
+ * Handle node leaving the ring. This can be either because the node was removed manually by
+ * removetoken command or because of decommission or loadbalance
+ *
+ * @param endPoint If reason for leaving is decommission or loadbalance (LEFT_NORMALLY),
+ * endPoint is the leaving node. If reason manual removetoken (REMOVE_TOKEN), endPoint
+ * parameter is ignored and the operation is based on the token inside moveValue.
+ * @param moveValue (REMOVE_TOKEN|LEFT_NORMALLY)<Delimiter><token>
+ */
+ private void handleStateLeft(InetAddress endPoint, String moveValue)
+ {
+ int index = moveValue.indexOf(Delimiter);
+ assert (index != -1);
+ String typeOfState = moveValue.substring(0, index);
+ Token token = getPartitioner().getTokenFactory().fromString(moveValue.substring(index + 1));
+
+ // endPoint itself is leaving
+ if (typeOfState.equals(LEFT_NORMALLY))
{
- // STATE_LEFT state is of form (REMOVE_TOKEN|LEFT_NORMALLY)<StateDelimiter><token>
- String stateValue = state.getValue();
- int index = stateValue.indexOf(StateDelimiter);
- assert (index != -1);
- String typeOfState = stateValue.substring(0, index);
- Token token = getPartitioner().getTokenFactory().fromString(stateValue.substring(index + 1));
+ if (logger_.isDebugEnabled())
+ logger_.debug("Node " + endPoint + " state left, token " + token);
- if (typeOfState.equals(LEFT_NORMALLY))
+ // If the node is member, remove all references to it. If not, call
+ // removeBootstrapToken just in case it is there (very unlikely chain of events)
+ if (tokenMetadata_.isMember(endPoint))
{
- if (tokenMetadata_.isMember(endpoint))
- {
- if (logger_.isDebugEnabled())
- logger_.debug(endpoint + " state left, token " + token);
- assert tokenMetadata_.getToken(endpoint).equals(token);
- tokenMetadata_.removeEndpoint(endpoint);
- calculatePendingRanges();
- }
+ if (!tokenMetadata_.getToken(endPoint).equals(token))
+ logger_.warn("Node " + endPoint + " 'left' token mismatch. Long network partition?");
+ tokenMetadata_.removeEndpoint(endPoint);
}
- else
+ }
+ else
+ {
+ // if we're here, endPoint is not leaving but broadcasting remove token command
+ assert (typeOfState.equals(REMOVE_TOKEN));
+ InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
+ if (endPointThatLeft != null)
{
- assert (typeOfState.equals(REMOVE_TOKEN));
- InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
- if (logger_.isDebugEnabled())
- logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
- if (endPointThatLeft != null)
- {
- restoreReplicaCount(endPointThatLeft);
- tokenMetadata_.removeEndpoint(endPointThatLeft);
- calculatePendingRanges();
- }
+ restoreReplicaCount(endPointThatLeft);
+ tokenMetadata_.removeEndpoint(endPointThatLeft);
}
}
+
+ // remove token from bootstrap tokens just in case it is still there
+ tokenMetadata_.removeBootstrapToken(token);
+ calculatePendingRanges();
}
/**
@@ -544,7 +644,7 @@
tm.setPendingRanges(pendingRanges);
if (logger_.isDebugEnabled())
- logger_.debug("Pending ranges:\n" + tm.printPendingRanges());
+ logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
}
/**
@@ -629,7 +729,11 @@
currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_));
TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
- temp.removeEndpoint(endpoint);
+
+ // endpoint might or might not be 'leaving'. If it was not leaving (that is, removetoken
+ // command was used), it is still present in temp and must be removed.
+ if (temp.isMember(endpoint))
+ temp.removeEndpoint(endpoint);
Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
@@ -643,7 +747,10 @@
ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right(), temp);
newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
if (logger_.isDebugEnabled())
- logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, ", "));
+ if (newReplicaEndpoints.isEmpty())
+ logger_.debug("Range " + range + " already in all replicas");
+ else
+ logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, ", "));
changedRanges.putAll(range, newReplicaEndpoints);
}
@@ -1142,17 +1249,27 @@
return tokens;
}
+ /**
+ * Broadcast leaving status and update local tokenMetadata_ accordingly
+ */
+ private void startLeaving()
+ {
+ Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_LEAVING + Delimiter + getLocalToken().toString()));
+ tokenMetadata_.addLeavingEndPoint(FBUtilities.getLocalAddress());
+ calculatePendingRanges();
+ }
+
public void decommission() throws InterruptedException
{
if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
- if (tokenMetadata_.sortedTokens().size() < 2)
- throw new UnsupportedOperationException("no other nodes in the ring; decommission would be pointless");
+ if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
+ throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
logger_.info("DECOMMISSIONING");
- Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+ startLeaving();
logger_.info("decommission sleeping " + Streaming.RING_DELAY);
Thread.sleep(Streaming.RING_DELAY);
@@ -1173,10 +1290,11 @@
{
SystemTable.setBootstrapped(false);
tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+ calculatePendingRanges();
if (logger_.isDebugEnabled())
logger_.debug("");
- Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(LEFT_NORMALLY + StateDelimiter + getLocalToken().toString()));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + LEFT_NORMALLY + Delimiter + getLocalToken().toString()));
try
{
Thread.sleep(2 * Gossiper.intervalInMillis_);
@@ -1250,7 +1368,7 @@
throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
logger_.info("starting move. leaving token " + getLocalToken());
- Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+ startLeaving();
logger_.info("move sleeping " + Streaming.RING_DELAY);
Thread.sleep(Streaming.RING_DELAY);
@@ -1293,6 +1411,7 @@
restoreReplicaCount(endPoint);
tokenMetadata_.removeEndpoint(endPoint);
+ calculatePendingRanges();
}
// This is not the cleanest way as we're adding STATE_LEFT for
@@ -1302,7 +1421,7 @@
// not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
// between removetoken command and normal state left, so it is
// not so bad.
- Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(REMOVE_TOKEN + StateDelimiter + token.toString()));
+ Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + token.toString()));
}
public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
@@ -1319,4 +1438,21 @@
{
return isClientMode;
}
+
+ // Never ever do this at home. Used by tests.
+ AbstractReplicationStrategy setReplicationStrategyUnsafe(AbstractReplicationStrategy newStrategy)
+ {
+ AbstractReplicationStrategy oldStrategy = replicationStrategy_;
+ replicationStrategy_ = newStrategy;
+ return oldStrategy;
+ }
+
+ // Never ever do this at home. Used by tests.
+ IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
+ {
+ IPartitioner oldPartitioner = partitioner_;
+ partitioner_ = newPartitioner;
+ return oldPartitioner;
+ }
+
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java Mon Dec 21 18:50:09 2009
@@ -280,4 +280,18 @@
validateColumns(keyspace, cfName, scName, predicate.column_names);
}
}
+
+ public static void validatePredicate(String keyspace, ColumnParent column_parent, SlicePredicate predicate)
+ throws InvalidRequestException
+ {
+ if (predicate.column_names == null && predicate.slice_range == null)
+ throw new InvalidRequestException("predicate column_names and slice_range may not both be null");
+ if (predicate.column_names != null && predicate.slice_range != null)
+ throw new InvalidRequestException("predicate column_names and slice_range may not both be present");
+
+ if (predicate.getSlice_range() != null)
+ validateRange(keyspace, column_parent, predicate.slice_range);
+ else
+ validateColumns(keyspace, column_parent, predicate.column_names);
+ }
}
Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
/incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-892891
/incubator/cassandra/trunk/test/unit/org:749219-768583
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Mon Dec 21 18:50:09 2009
@@ -62,7 +62,7 @@
Range range3 = ss.getPrimaryRangeForEndPoint(three);
Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(), range3.right());
assert range3.contains(fakeToken);
- ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken)));
+ ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken)));
tmd = ss.getTokenMetadata();
InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);