You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 17:37:04 UTC

svn commit: r888866 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: service/StorageService.java service/StorageServiceMBean.java tools/NodeProbe.java

Author: jbellis
Date: Wed Dec  9 16:37:03 2009
New Revision: 888866

URL: http://svn.apache.org/viewvc?rev=888866&view=rev
Log:
Add removeToken command to StorageService and nodeprobe.  patch by Jaakko Laine; reviewed by jbellis for CASSANDRA-564

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=888866&r1=888865&r2=888866&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Dec  9 16:37:03 2009
@@ -68,6 +68,10 @@
     public final static String STATE_LEAVING = "LEAVING";
     public final static String STATE_LEFT = "LEFT";
 
+    private final static char StateDelimiter = ',';
+    private final static String REMOVE_TOKEN = "remove";
+    private final static String LEFT_NORMALLY = "left";
+
     /* All verb handler identifiers */
     public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
     public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
@@ -433,13 +437,107 @@
         }
         else if (STATE_LEFT.equals(stateName))
         {
-            Token token = getPartitioner().getTokenFactory().fromString(state.getValue());
-            assert tokenMetadata_.getToken(endpoint).equals(token);
-            tokenMetadata_.removeEndpoint(endpoint);
+            // STATE_LEFT state is of form (REMOVE_TOKEN|LEFT_NORMALLY)<StateDelimiter><token>
+            String stateValue = state.getValue();
+            int index = stateValue.indexOf(StateDelimiter);
+            assert (index != -1);
+            String typeOfState = stateValue.substring(0, index);
+            Token token = getPartitioner().getTokenFactory().fromString(stateValue.substring(index + 1));
+
+            if (typeOfState.equals(LEFT_NORMALLY))
+            {
+                if (tokenMetadata_.isMember(endpoint))
+                {
+                    if (logger_.isDebugEnabled())
+                        logger_.debug(endpoint + " state left, token " + token);
+                    assert tokenMetadata_.getToken(endpoint).equals(token);
+                    tokenMetadata_.removeEndpoint(endpoint);
+                }
+            }
+            else
+            {
+                assert (typeOfState.equals(REMOVE_TOKEN));
+                InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
+                if (endPointThatLeft != null)
+                {
+                    restoreReplicaCount(endPointThatLeft);
+                    tokenMetadata_.removeEndpoint(endPointThatLeft);
+                }
+            }
+
             replicationStrategy_.removeObsoletePendingRanges();
         }
     }
 
+    /**
+     * Called when endPoint is removed from the ring without proper
+     * STATE_LEAVING -> STATE_LEFT sequence. This function checks
+     * whether this node becomes responsible for new ranges as a
+     * consequence and streams data if needed.
+     *
+     * This is rather ineffective, but it does not matter so much
+     * since this is called very seldom
+     *
+     * @param endPoint node that has left
+     */
+    private void restoreReplicaCount(InetAddress endPoint)
+    {
+        InetAddress myAddress = FBUtilities.getLocalAddress();
+
+        // get all ranges that change ownership (that is, a node needs
+        // to take responsibility for new range)
+        Multimap<Range, InetAddress> changedRanges = getChangedRangesForLeaving(endPoint);
+
+        // check if any of these ranges are coming our way
+        Set<Range> myNewRanges = new HashSet<Range>();
+        for (Map.Entry<Range, InetAddress> entry : changedRanges.entries())
+        {
+            if (entry.getValue().equals(myAddress))
+                myNewRanges.add(entry.getKey());
+        }
+
+        if (!myNewRanges.isEmpty())
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug(endPoint + " was removed, my added ranges: " + StringUtils.join(myNewRanges, ", "));
+
+            Multimap<Range, InetAddress> rangeAddresses = replicationStrategy_.getRangeAddresses(tokenMetadata_);
+            Multimap<InetAddress, Range> sourceRanges = HashMultimap.create();
+            IFailureDetector failureDetector = FailureDetector.instance();
+
+            // find alive sources for our new ranges
+            for (Range myNewRange : myNewRanges)
+            {
+                List<InetAddress> sources = DatabaseDescriptor.getEndPointSnitch().sortByProximity(myAddress, rangeAddresses.get(myNewRange));
+
+                assert (!sources.contains(myAddress));
+
+                for (InetAddress source : sources)
+                {
+                    if (source.equals(endPoint))
+                        continue;
+
+                    if (failureDetector.isAlive(source))
+                    {
+                        sourceRanges.put(source, myNewRange);
+                        break;
+                    }
+                }
+            }
+
+            // Finally we have a list of addresses and ranges to
+            // stream. Proceed to stream
+            for (Map.Entry<InetAddress, Collection<Range>> entry : sourceRanges.asMap().entrySet())
+            {
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Requesting from " + entry.getKey() + " ranges " + StringUtils.join(entry.getValue(), ", "));
+                Streaming.requestRanges(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
     private Multimap<Range, InetAddress> getChangedRangesForLeaving(InetAddress endpoint)
     {
         // First get all ranges the leaving endpoint is responsible for
@@ -1033,7 +1131,7 @@
 
         if (logger_.isDebugEnabled())
             logger_.debug("");
-        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
+        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(LEFT_NORMALLY + StateDelimiter + getLocalToken().toString()));
         try
         {
             Thread.sleep(2 * Gossiper.intervalInMillis_);
@@ -1132,6 +1230,37 @@
         unbootstrap(finishMoving);
     }
 
+    public void removeToken(String tokenString)
+    {
+        Token token = partitioner_.getTokenFactory().fromString(tokenString);
+
+        // Here we could refuse the operation from continuing if we
+        // cannot find the endpoint for this token from metadata, but
+        // that would prevent this command from being issued by a node
+        // that has never seen the failed node.
+        InetAddress endPoint = tokenMetadata_.getEndPoint(token);
+        if (endPoint != null)
+        {
+            // Let's make sure however that we're not removing a live
+            // token (member)
+            if (Gossiper.instance().getLiveMembers().contains(endPoint))
+                throw new UnsupportedOperationException("Node " + endPoint + " is alive and owns this token. Use decommission command to remove it from the ring");
+
+            restoreReplicaCount(endPoint);
+            tokenMetadata_.removeEndpoint(endPoint);
+            replicationStrategy_.removeObsoletePendingRanges();
+        }
+
+        // This is not the cleanest way as we're adding STATE_LEFT for
+        // a foreign token to our own EP state. Another way would be
+        // to add new AP state for this command, but that would again
+        // increase the amount of data to be gossiped in the cluster -
+        // not good. REMOVE_TOKEN|LEFT_NORMALLY is used to distinguish
+        // between removetoken command and normal state left, so it is
+        // not so bad.
+        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(REMOVE_TOKEN + StateDelimiter + token.toString()));
+    }
+
     public WriteResponseHandler getWriteResponseHandler(int blockFor, int consistency_level)
     {
         return replicationStrategy_.getWriteResponseHandler(blockFor, consistency_level);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=888866&r1=888865&r2=888866&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Dec  9 16:37:03 2009
@@ -147,6 +147,12 @@
      */
     public void cancelPendingRanges();
 
+    /**
+     * removeToken removes token (and all data associated with
+     * enpoint that had it) from the ring
+     */
+    public void removeToken(String token);
+
     /** set the logging level at runtime */
     public void setLog4jLevel(String classQualifier, String level);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=888866&r1=888865&r2=888866&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Dec  9 16:37:03 2009
@@ -403,6 +403,11 @@
         ssProxy.cancelPendingRanges();
     }
 
+    public void removeToken(String token)
+    {
+        ssProxy.removeToken(token);
+    }
+
     /**
      * Print out the size of the queues in the thread pools
      *
@@ -498,7 +503,7 @@
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
                 "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
-                "tpstats, flush, repair, decommission, move, loadbalance, cancelpending, " +
+                "tpstats, flush, repair, decommission, move, loadbalance, cancelpending, removetoken, " +
                 " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
@@ -577,6 +582,14 @@
         {
             probe.cancelPendingRanges();
         }
+        else if (cmdName.equals("removetoken"))
+        {
+            if (arguments.length <= 1)
+            {
+                System.err.println("missing token argument");
+            }
+            probe.removeToken(arguments[1]);
+        }
         else if (cmdName.equals("snapshot"))
         {
             String snapshotName = "";