You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/21 19:50:10 UTC

svn commit: r892926 - in /incubator/cassandra/trunk: ./ interface/gen-java/org/apache/cassandra/service/ src/java/org/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/ test/unit/org/apache/cassandra/dht/ test...

Author: jbellis
Date: Mon Dec 21 18:50:09 2009
New Revision: 892926

URL: http://svn.apache.org/viewvc?rev=892926&view=rev
Log:
merge from 0.5

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
      - copied unchanged from r892891, incubator/cassandra/branches/cassandra-0.5/test/unit/org/apache/cassandra/service/MoveTest.java
Modified:
    incubator/cassandra/trunk/   (props changed)
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java   (props changed)
    incubator/cassandra/trunk/src/java/org/   (props changed)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
    incubator/cassandra/trunk/test/unit/org/   (props changed)
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5:888872-892352
+/incubator/cassandra/branches/cassandra-0.5:888872-892891

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Mon Dec 21 18:50:09 2009
@@ -11,8 +11,12 @@
  * Fix anti-entropy assertion error (CASSANDRA-639)
  * Fix pending range conflicts when bootstapping or moving
    multiple nodes at once (CASSANDRA-603)
+ * Handle obsolete gossip related to node movement in the case where
+   one or more nodes is down when the movement occurs (CASSANDRA-572)
  * Include dead nodes in gossip to avoid a variety of problems
    (CASSANDRA-634)
+ * return an InvalidRequestException for mal-formed SlicePredicates
+   (CASSANDRA-643)
 
 
 0.5.0 beta 2

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-892891
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,5 +1,5 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-892891
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-892891
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-892891
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,5 +1,5 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-892891
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588

Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/src/java/org:888872-892891
 /incubator/cassandra/trunk/src/java/org:749219-769885

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStategy.java Mon Dec 21 18:50:09 2009
@@ -133,6 +133,10 @@
     private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, TokenMetadata metadata) throws IOException
     {
         ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+        if (metadata.sortedTokens().size() == 0)
+            return endpoints;
+
         if (null == tokens || tokens.size() != metadata.sortedTokens().size())
         {
             loadEndPoints(metadata);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon Dec 21 18:50:09 2009
@@ -50,6 +50,10 @@
         boolean bOtherRack = false;
         int foundCount = 0;
         List tokens = metadata.sortedTokens();
+
+        if (tokens.isEmpty())
+            return endpoints;
+
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Dec 21 18:50:09 2009
@@ -45,6 +45,10 @@
         int startIndex;
         List<Token> tokenList = new ArrayList<Token>();
         List tokens = new ArrayList<Token>(metadata.sortedTokens());
+        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(tokenList.size());
+
+        if (tokens.isEmpty())
+            return endpoints;
 
         int index = Collections.binarySearch(tokens, token);
         if (index < 0)
@@ -64,7 +68,6 @@
             assert !tokenList.contains(tokens.get(i));
             tokenList.add((Token) tokens.get(i));
         }
-        ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>(tokenList.size());
         for (Token t : tokenList)
             endpoints.add(metadata.getEndPoint(t));
         return endpoints;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Mon Dec 21 18:50:09 2009
@@ -47,7 +47,7 @@
     // for any nodes that boot simultaneously between same two nodes. For this we cannot simply make pending ranges a multimap,
     // since that would make us unable to notice the real problem of two nodes trying to boot using the same token.
     // In order to do this properly, we need to know what tokens are booting at any time.
-    private Map<Token, InetAddress> bootstrapTokens;
+    private BiMap<Token, InetAddress> bootstrapTokens;
 
     // we will need to know at all times what nodes are leaving and calculate ranges accordingly.
     // An anonymous pending ranges list is not enough, as that does not tell which node is leaving
@@ -71,7 +71,7 @@
         if (tokenToEndPointMap == null)
             tokenToEndPointMap = HashBiMap.create();
         this.tokenToEndPointMap = tokenToEndPointMap;
-        bootstrapTokens = new HashMap<Token, InetAddress>();
+        bootstrapTokens = HashBiMap.create();
         leavingEndPoints = new HashSet<InetAddress>();
         pendingRanges = HashMultimap.create();
         sortedTokens = sortTokens();
@@ -103,13 +103,13 @@
         lock.writeLock().lock();
         try
         {
-            bootstrapTokens.remove(token);
-
+            bootstrapTokens.inverse().remove(endpoint);
             tokenToEndPointMap.inverse().remove(endpoint);
             if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
             {
                 sortedTokens = sortTokens();
             }
+            leavingEndPoints.remove(endpoint);
         }
         finally
         {
@@ -125,9 +125,17 @@
         lock.writeLock().lock();
         try
         {
-            InetAddress oldEndPoint = bootstrapTokens.get(token);
+            InetAddress oldEndPoint = null;
+
+            oldEndPoint = bootstrapTokens.get(token);
+            if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
+                throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint + " and " + endpoint + " (token " + token);
+
+            oldEndPoint = tokenToEndPointMap.get(token);
             if (oldEndPoint != null && !oldEndPoint.equals(endpoint))
                 throw new RuntimeException("Bootstrap Token collision between " + oldEndPoint + " and " + endpoint + " (token " + token);
+
+            bootstrapTokens.inverse().remove(endpoint);
             bootstrapTokens.put(token, endpoint);
         }
         finally
@@ -136,6 +144,21 @@
         }
     }
 
+    public void removeBootstrapToken(Token token)
+    {
+        assert token != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            bootstrapTokens.remove(token);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
     public void addLeavingEndPoint(InetAddress endpoint)
     {
         assert endpoint != null;
@@ -151,13 +174,28 @@
         }
     }
 
+    public void removeLeavingEndPoint(InetAddress endpoint)
+    {
+        assert endpoint != null;
+
+        lock.writeLock().lock();
+        try
+        {
+            leavingEndPoints.remove(endpoint);
+        }
+        finally
+        {
+            lock.writeLock().unlock();
+        }
+    }
+
     public void removeEndpoint(InetAddress endpoint)
     {
         assert tokenToEndPointMap.containsValue(endpoint);
         lock.writeLock().lock();
         try
         {
-            bootstrapTokens.remove(getToken(endpoint));
+            bootstrapTokens.inverse().remove(endpoint);
             tokenToEndPointMap.inverse().remove(endpoint);
             leavingEndPoints.remove(endpoint);
             sortedTokens = sortTokens();
@@ -199,6 +237,21 @@
         }
     }
 
+    public boolean isLeaving(InetAddress endpoint)
+    {
+        assert endpoint != null;
+
+        lock.readLock().lock();
+        try
+        {
+            return leavingEndPoints.contains(endpoint);
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     public InetAddress getFirstEndpoint()
     {
         assert tokenToEndPointMap.size() > 0;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Mon Dec 21 18:50:09 2009
@@ -206,9 +206,9 @@
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         ThriftValidation.validateColumnParent(keyspace, column_parent);
-        List<ReadCommand> commands = new ArrayList<ReadCommand>();
-        SliceRange range = predicate.slice_range;
+        ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
 
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
         if (predicate.column_names != null)
         {
             for (String key: keys)
@@ -216,16 +216,15 @@
                 ThriftValidation.validateKey(key);
                 commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
             }
-            ThriftValidation.validateColumns(keyspace, column_parent, predicate.column_names);
         }
         else
         {
+            SliceRange range = predicate.slice_range;
             for (String key: keys)
             {
                 ThriftValidation.validateKey(key);
                 commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
             }
-            ThriftValidation.validateRange(keyspace, column_parent, range);
         }
 
         return getSlice(commands, consistency_level);
@@ -566,8 +565,7 @@
         if (logger.isDebugEnabled())
             logger.debug("range_slice");
 
-        validatePredicate(keyspace, column_parent, predicate);
-
+        ThriftValidation.validatePredicate(keyspace, column_parent, predicate);
         if (!StorageService.getPartitioner().preservesOrder())
         {
             throw new InvalidRequestException("range queries may only be performed against an order-preserving partitioner");

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Dec 21 18:50:09 2009
@@ -60,15 +60,18 @@
 {
     private static Logger logger_ = Logger.getLogger(StorageService.class);     
 
-    // these aren't in an enum since other gossip users can create states ad-hoc too (e.g. load broadcasting)
+    public final static String MOVE_STATE = "MOVE";
+
+    // this must be a char that cannot be present in any token
+    public final static char Delimiter = ',';
+
+    public final static String STATE_BOOTSTRAPPING = "BOOT";
     public final static String STATE_NORMAL = "NORMAL";
-    public final static String STATE_BOOTSTRAPPING = "BOOTSTRAPPING";
     public final static String STATE_LEAVING = "LEAVING";
     public final static String STATE_LEFT = "LEFT";
 
-    private final static char StateDelimiter = ',';
-    private final static String REMOVE_TOKEN = "remove";
-    private final static String LEFT_NORMALLY = "left";
+    public final static String REMOVE_TOKEN = "remove";
+    public final static String LEFT_NORMALLY = "left";
 
     /* All verb handler identifiers */
     public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
@@ -173,7 +176,7 @@
         isBootstrapMode = false;
         SystemTable.setBootstrapped(true);
         setToken(getLocalToken());
-        Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
+        Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));
         logger_.info("Bootstrap/move completed! Now serving reads.");
     }
 
@@ -307,7 +310,7 @@
             SystemTable.setBootstrapped(true);
             Token token = storageMetadata_.getToken();
             tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
-            Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
+            Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token)));
         }
 
         assert tokenMetadata_.sortedTokens().size() > 0;
@@ -317,7 +320,7 @@
     {
         isBootstrapMode = true;
         SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-        Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
+        Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token)));
         logger_.info("bootstrap sleeping " + Streaming.RING_DELAY);
         try
         {
@@ -400,67 +403,164 @@
      * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode
      * should instead be part of the token ring.
      */
-    public void onChange(InetAddress endpoint, String stateName, ApplicationState state)
+    public void onChange(InetAddress endpoint, String apStateName, ApplicationState apState)
     {
-        if (STATE_BOOTSTRAPPING.equals(stateName))
-        {
-            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
-            if (logger_.isDebugEnabled())
-                logger_.debug(endpoint + " state bootstrapping, token " + token);
-            tokenMetadata_.addBootstrapToken(token, endpoint);
-            calculatePendingRanges();
+        if (!MOVE_STATE.equals(apStateName))
+            return;
+
+        String apStateValue = apState.getValue();
+        int index = apStateValue.indexOf(Delimiter);
+        assert (index != -1);
+
+        String moveName = apStateValue.substring(0, index);
+        String moveValue = apStateValue.substring(index+1);
+
+        if (moveName.equals(STATE_BOOTSTRAPPING))
+            handleStateBootstrap(endpoint, moveValue);
+        else if (moveName.equals(STATE_NORMAL))
+            handleStateNormal(endpoint, moveValue);
+        else if (moveName.equals(STATE_LEAVING))
+            handleStateLeaving(endpoint, moveValue);
+        else if (moveName.equals(STATE_LEFT))
+            handleStateLeft(endpoint, moveValue);
+    }
+
+    /**
+     * Handle node bootstrap
+     *
+     * @param endPoint bootstrapping node
+     * @param moveValue bootstrap token as string
+     */
+    private void handleStateBootstrap(InetAddress endPoint, String moveValue)
+    {
+        Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+        if (logger_.isDebugEnabled())
+            logger_.debug("Node " + endPoint + " state bootstrapping, token " + token);
+
+        // if this node is present in token metadata, either we have missed intermediate states
+        // or the node had crashed. Print warning if needed, clear obsolete stuff and
+        // continue.
+        if (tokenMetadata_.isMember(endPoint))
+        {
+            // If isLeaving is false, we have missed both LEAVING and LEFT. However, if
+            // isLeaving is true, we have only missed LEFT. Waiting time between completing
+            // leave operation and rebootstrapping is relatively short, so the latter is quite
+            // common (not enough time for gossip to spread). Therefore we report only the
+            // former in the log.
+            if (!tokenMetadata_.isLeaving(endPoint))
+                logger_.info("Node " + endPoint + " state jump to bootstrap");
+            tokenMetadata_.removeEndpoint(endPoint);
         }
-        else if (STATE_NORMAL.equals(stateName))
+
+        tokenMetadata_.addBootstrapToken(token, endPoint);
+        calculatePendingRanges();
+    }
+
+    /**
+     * Handle node move to normal state. That is, node is entering token ring and participating
+     * in reads.
+     *
+     * @param endPoint node
+     * @param moveValue token as string
+     */
+    private void handleStateNormal(InetAddress endPoint, String moveValue)
+    {
+        Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+        if (logger_.isDebugEnabled())
+            logger_.debug("Node " + endPoint + " state normal, token " + token);
+
+        if (tokenMetadata_.isMember(endPoint))
+            logger_.info("Node " + endPoint + " state jump to normal");
+
+        tokenMetadata_.updateNormalToken(token, endPoint);
+        calculatePendingRanges();
+        if (!isClientMode)
+            SystemTable.updateToken(endPoint, token);
+    }
+
+    /**
+     * Handle node preparing to leave the ring
+     *
+     * @param endPoint node
+     * @param moveValue token as string
+     */
+    private void handleStateLeaving(InetAddress endPoint, String moveValue)
+    {
+        Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+
+        if (logger_.isDebugEnabled())
+            logger_.debug("Node " + endPoint + " state leaving, token " + token);
+
+        // If the node is previously unknown or tokens do not match, update tokenmetadata to
+        // have this node as 'normal' (it must have been using this token before the
+        // leave). This way we'll get pending ranges right.
+        if (!tokenMetadata_.isMember(endPoint))
         {
-            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
-            if (logger_.isDebugEnabled())
-                logger_.debug(endpoint + " state normal, token " + token);
-            tokenMetadata_.updateNormalToken(token, endpoint);
-            calculatePendingRanges();
-            if (!isClientMode)
-                SystemTable.updateToken(endpoint, token);
+            logger_.info("Node " + endPoint + " state jump to leaving");
+            tokenMetadata_.updateNormalToken(token, endPoint);
         }
-        else if (STATE_LEAVING.equals(stateName))
+        else if (!tokenMetadata_.getToken(endPoint).equals(token))
         {
-            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
-            assert tokenMetadata_.getToken(endpoint).equals(token);
-            tokenMetadata_.addLeavingEndPoint(endpoint);
-            calculatePendingRanges();
+            logger_.warn("Node " + endPoint + " 'leaving' token mismatch. Long network partition?");
+            tokenMetadata_.updateNormalToken(token, endPoint);
         }
-        else if (STATE_LEFT.equals(stateName))
+
+        // at this point the endpoint is certainly a member with this token, so let's proceed
+        // normally
+        tokenMetadata_.addLeavingEndPoint(endPoint);
+        calculatePendingRanges();
+    }
+
+    /**
+     * Handle node leaving the ring. This can be either because the node was removed manually by
+     * removetoken command or because of decommission or loadbalance
+     *
+     * @param endPoint If reason for leaving is decommission or loadbalance (LEFT_NORMALLY),
+     * endPoint is the leaving node. If reason manual removetoken (REMOVE_TOKEN), endPoint
+     * parameter is ignored and the operation is based on the token inside moveValue.
+     * @param moveValue (REMOVE_TOKEN|LEFT_NORMALLY)<Delimiter><token>
+     */
+    private void handleStateLeft(InetAddress endPoint, String moveValue)
+    {
+        int index = moveValue.indexOf(Delimiter);
+        assert (index != -1);
+        String typeOfState = moveValue.substring(0, index);
+        Token token = getPartitioner().getTokenFactory().fromString(moveValue.substring(index + 1));
+
+        // endPoint itself is leaving
+        if (typeOfState.equals(LEFT_NORMALLY))
         {
-            // STATE_LEFT state is of form (REMOVE_TOKEN|LEFT_NORMALLY)<StateDelimiter><token>
-            String stateValue = state.getValue();
-            int index = stateValue.indexOf(StateDelimiter);
-            assert (index != -1);
-            String typeOfState = stateValue.substring(0, index);
-            Token token = getPartitioner().getTokenFactory().fromString(stateValue.substring(index + 1));
+            if (logger_.isDebugEnabled())
+                logger_.debug("Node " + endPoint + " state left, token " + token);
 
-            if (typeOfState.equals(LEFT_NORMALLY))
+            // If the node is member, remove all references to it. If not, call
+            // removeBootstrapToken just in case it is there (very unlikely chain of events)
+            if (tokenMetadata_.isMember(endPoint))
             {
-                if (tokenMetadata_.isMember(endpoint))
-                {
-                    if (logger_.isDebugEnabled())
-                        logger_.debug(endpoint + " state left, token " + token);
-                    assert tokenMetadata_.getToken(endpoint).equals(token);
-                    tokenMetadata_.removeEndpoint(endpoint);
-                    calculatePendingRanges();
-                }
+                if (!tokenMetadata_.getToken(endPoint).equals(token))
+                    logger_.warn("Node " + endPoint + " 'left' token mismatch. Long network partition?");
+                tokenMetadata_.removeEndpoint(endPoint);
             }
-            else
+        }
+        else
+        {
+            // if we're here, endPoint is not leaving but broadcasting remove token command
+            assert (typeOfState.equals(REMOVE_TOKEN));
+            InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+            if (logger_.isDebugEnabled())
+                logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
+            if (endPointThatLeft != null)
             {
-                assert (typeOfState.equals(REMOVE_TOKEN));
-                InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
-                if (logger_.isDebugEnabled())
-                    logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
-                if (endPointThatLeft != null)
-                {
-                    restoreReplicaCount(endPointThatLeft);
-                    tokenMetadata_.removeEndpoint(endPointThatLeft);
-                    calculatePendingRanges();
-                }
+                restoreReplicaCount(endPointThatLeft);
+                tokenMetadata_.removeEndpoint(endPointThatLeft);
             }
         }
+
+        // remove token from bootstrap tokens just in case it is still there
+        tokenMetadata_.removeBootstrapToken(token);
+        calculatePendingRanges();
     }
 
     /**
@@ -544,7 +644,7 @@
         tm.setPendingRanges(pendingRanges);
 
         if (logger_.isDebugEnabled())
-            logger_.debug("Pending ranges:\n" + tm.printPendingRanges());
+            logger_.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
     }
 
     /**
@@ -629,7 +729,11 @@
             currentReplicaEndpoints.put(range, replicationStrategy_.getNaturalEndpoints(range.right(), tokenMetadata_));
 
         TokenMetadata temp = tokenMetadata_.cloneAfterAllLeft();
-        temp.removeEndpoint(endpoint);
+
+        // endpoint might or might not be 'leaving'. If it was not leaving (that is, removetoken
+        // command was used), it is still present in temp and must be removed.
+        if (temp.isMember(endpoint))
+            temp.removeEndpoint(endpoint);
 
         Multimap<Range, InetAddress> changedRanges = HashMultimap.create();
 
@@ -643,7 +747,10 @@
             ArrayList<InetAddress> newReplicaEndpoints = replicationStrategy_.getNaturalEndpoints(range.right(), temp);
             newReplicaEndpoints.removeAll(currentReplicaEndpoints.get(range));
             if (logger_.isDebugEnabled())
-                logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, ", "));
+                if (newReplicaEndpoints.isEmpty())
+                    logger_.debug("Range " + range + " already in all replicas");
+                else
+                    logger_.debug("Range " + range + " will be responsibility of " + StringUtils.join(newReplicaEndpoints, ", "));
             changedRanges.putAll(range, newReplicaEndpoints);
         }
 
@@ -1142,17 +1249,27 @@
         return tokens;
     }
 
+    /**
+     * Broadcast leaving status and update local tokenMetadata_ accordingly
+     */
+    private void startLeaving()
+    {
+        Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_LEAVING + Delimiter + getLocalToken().toString()));
+        tokenMetadata_.addLeavingEndPoint(FBUtilities.getLocalAddress());
+        calculatePendingRanges();
+    }
+
     public void decommission() throws InterruptedException
     {
         if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
             throw new UnsupportedOperationException("local node is not a member of the token ring yet");
-        if (tokenMetadata_.sortedTokens().size() < 2)
-            throw new UnsupportedOperationException("no other nodes in the ring; decommission would be pointless");
+        if (tokenMetadata_.cloneAfterAllLeft().sortedTokens().size() < 2)
+            throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
         if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
             throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
 
         logger_.info("DECOMMISSIONING");
-        Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+        startLeaving();
         logger_.info("decommission sleeping " + Streaming.RING_DELAY);
         Thread.sleep(Streaming.RING_DELAY);
 
@@ -1173,10 +1290,11 @@
     {
         SystemTable.setBootstrapped(false);
         tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+        calculatePendingRanges();
 
         if (logger_.isDebugEnabled())
             logger_.debug("");
-        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(LEFT_NORMALLY + StateDelimiter + getLocalToken().toString()));
+        Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + LEFT_NORMALLY + Delimiter + getLocalToken().toString()));
         try
         {
             Thread.sleep(2 * Gossiper.intervalInMillis_);
@@ -1250,7 +1368,7 @@
             throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
 
         logger_.info("starting move. leaving token " + getLocalToken());
-        Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+        startLeaving();
         logger_.info("move sleeping " + Streaming.RING_DELAY);
         Thread.sleep(Streaming.RING_DELAY);
 
@@ -1293,6 +1411,7 @@
 
             restoreReplicaCount(endPoint);
             tokenMetadata_.removeEndpoint(endPoint);
+            calculatePendingRanges();
         }
 
         // This is not the cleanest way as we're adding STATE_LEFT for
@@ -1302,7 +1421,7 @@
         // not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
         // between removetoken command and normal state left, so it is
         // not so bad.
-        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(REMOVE_TOKEN + StateDelimiter + token.toString()));
+        Gossiper.instance().addApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + token.toString()));
     }
 
     public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
@@ -1319,4 +1438,21 @@
     {
         return isClientMode;
     }
+
+    // Never ever do this at home. Used by tests.
+    AbstractReplicationStrategy setReplicationStrategyUnsafe(AbstractReplicationStrategy newStrategy)
+    {
+        AbstractReplicationStrategy oldStrategy = replicationStrategy_;
+        replicationStrategy_ = newStrategy;
+        return oldStrategy;
+    }
+
+    // Never ever do this at home. Used by tests.
+    IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
+    {
+        IPartitioner oldPartitioner = partitioner_;
+        partitioner_ = newPartitioner;
+        return oldPartitioner;
+    }
+
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ThriftValidation.java Mon Dec 21 18:50:09 2009
@@ -280,4 +280,18 @@
             validateColumns(keyspace, cfName, scName, predicate.column_names);
         }
     }
+
+    public static void validatePredicate(String keyspace, ColumnParent column_parent, SlicePredicate predicate)
+            throws InvalidRequestException
+    {
+        if (predicate.column_names == null && predicate.slice_range == null)
+            throw new InvalidRequestException("predicate column_names and slice_range may not both be null");
+        if (predicate.column_names != null && predicate.slice_range != null)
+            throw new InvalidRequestException("predicate column_names and slice_range may not both be present");
+
+        if (predicate.getSlice_range() != null)
+            validateRange(keyspace, column_parent, predicate.slice_range);
+        else
+            validateColumns(keyspace, column_parent, predicate.column_names);
+    }
 }

Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Dec 21 18:50:09 2009
@@ -1,4 +1,4 @@
 /incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-834239,834349-834350
-/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-892352
+/incubator/cassandra/branches/cassandra-0.5/test/unit/org:888872-892891
 /incubator/cassandra/trunk/test/unit/org:749219-768583

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=892926&r1=892925&r2=892926&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Mon Dec 21 18:50:09 2009
@@ -62,7 +62,7 @@
         Range range3 = ss.getPrimaryRangeForEndPoint(three);
         Token fakeToken = ((IPartitioner)StorageService.getPartitioner()).midpoint(range3.left(), range3.right());
         assert range3.contains(fakeToken);
-        ss.onChange(myEndpoint, StorageService.STATE_BOOTSTRAPPING, new ApplicationState(ss.getPartitioner().getTokenFactory().toString(fakeToken)));
+        ss.onChange(myEndpoint, StorageService.MOVE_STATE, new ApplicationState(StorageService.STATE_BOOTSTRAPPING + StorageService.Delimiter + ss.getPartitioner().getTokenFactory().toString(fakeToken)));
         tmd = ss.getTokenMetadata();
 
         InetAddress source2 = BootStrapper.getBootstrapSource(tmd, load);