You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/13 16:57:31 UTC

svn commit: r835890 - in /incubator/cassandra/trunk: bin/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/apac...

Author: jbellis
Date: Fri Nov 13 15:57:31 2009
New Revision: 835890

URL: http://svn.apache.org/viewvc?rev=835890&view=rev
Log:
add Move command
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-541

Removed:
    incubator/cassandra/trunk/bin/tokenupdater
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Fri Nov 13 15:57:31 2009
@@ -127,7 +127,7 @@
             if (initialToken == null)
                 token = p.getRandomToken();
             else
-                token = p.getToken(initialToken);
+                token = p.getTokenFactory().fromString(initialToken);
 
             logger.info("Saved Token not found. Using " + token);
             // seconds-since-epoch isn't a foolproof new generation

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java Fri Nov 13 15:57:31 2009
@@ -72,6 +72,8 @@
 
     /**
      * @return a Token that can be used to route a given key
+     * (This is NOT a method to create a Token from its string representation;
+     * for that, use TokenFactory.fromString.)
      */
     public T getToken(String key);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Fri Nov 13 15:57:31 2009
@@ -50,6 +50,7 @@
 {
     private static final Logger logger = Logger.getLogger(SSTable.class);
 
+    public static final int FILES_ON_DISK = 3; // data, index, and bloom filter
 
     protected String path;
     protected IPartitioner partitioner;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Fri Nov 13 15:57:31 2009
@@ -72,7 +72,7 @@
         if (sstables.isEmpty())
             return;
 
-        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[sstables.size()];
+        StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
         int i = 0;
         for (SSTableReader sstable : sstables)
         {
@@ -130,7 +130,6 @@
                     StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
                     String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
 
-                    //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
                     if (logger.isDebugEnabled())
                       logger.debug("Received Data from  : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
                     streamContext.setTargetFile(file);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Fri Nov 13 15:57:31 2009
@@ -88,6 +88,7 @@
         lock.writeLock().lock();
         try
         {
+            tokenToEndPointMap.inverse().remove(endpoint);
             if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
             {
                 sortedTokens = sortTokens();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Fri Nov 13 15:57:31 2009
@@ -71,7 +71,6 @@
 
     private TcpConnection(InetAddress from, InetAddress to, TcpConnectionManager pool, boolean streaming) throws IOException
     {
-        logger_.debug("creating connection from " + from + " to " + to);
         socketChannel_ = SocketChannel.open();
         socketChannel_.socket().bind(new InetSocketAddress(from, 0));
         socketChannel_.configureBlocking(false);
@@ -291,18 +290,20 @@
     
     void closeSocket()
     {
-        logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");            
+        if (pendingWrites_.size() > 0)
+            logger_.error("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
         cancel(key_);
         pendingWrites_.clear();
     }
     
     void errorClose() 
     {        
-        logger_.warn("Closing down connection " + socketChannel_);
+        logger_.info("Closing errored connection " + socketChannel_);
         pendingWrites_.clear();
         cancel(key_);
         pendingWrites_.clear();
-        pool_.destroy(this);
+        if (pool_ != null)
+            pool_.destroy(this);
     }
     
     private void cancel(SelectionKey key)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Fri Nov 13 15:57:31 2009
@@ -261,7 +261,7 @@
     {        
         List<StreamContext> context = ctxBag_.get(key);
         if ( context == null )
-            throw new IllegalStateException("Streaming context has not been set.");
+            throw new IllegalStateException("Streaming context has not been set for " + key);
         StreamContext streamContext = context.remove(0);        
         if ( context.isEmpty() )
             ctxBag_.remove(key);
@@ -272,7 +272,7 @@
     {
         List<StreamStatus> status = streamStatusBag_.get(key);
         if ( status == null )
-            throw new IllegalStateException("Streaming status has not been set.");
+            throw new IllegalStateException("Streaming status has not been set for " + key);
         StreamStatus streamStatus = status.remove(0);        
         if ( status.isEmpty() )
             streamStatusBag_.remove(key);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Nov 13 15:57:31 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
+import java.io.IOError;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
 import java.util.*;
@@ -71,7 +72,6 @@
 
     /* All verb handler identifiers */
     public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
-    public final static String tokenVerbHandler_ = "TOKEN-VERB-HANDLER";
     public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
     public final static String readRepairVerbHandler_ = "READ-REPAIR-VERB-HANDLER";
     public final static String readVerbHandler_ = "ROW-READ-VERB-HANDLER";
@@ -165,7 +165,7 @@
     {
         bootstrapSet.remove(s);
         if (logger_.isDebugEnabled())
-            logger_.debug("Removed " + s + " as a bootstrap source");
+            logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet, ", ") + "]");
 
         if (bootstrapSet.isEmpty())
         {
@@ -222,7 +222,6 @@
         endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
 
         /* register the verb handlers */
-        MessagingService.instance().registerVerbHandlers(tokenVerbHandler_, new TokenUpdateVerbHandler());
         MessagingService.instance().registerVerbHandlers(binaryVerbHandler_, new BinaryVerbHandler());
         MessagingService.instance().registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
@@ -258,8 +257,6 @@
     public void start() throws IOException
     {
         storageMetadata_ = SystemTable.initMetadata();
-        isBootstrapMode = DatabaseDescriptor.isAutoBootstrap()
-                          && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped());
 
         /* Listen for application messages */
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
@@ -277,15 +274,14 @@
         Gossiper.instance().register(this);
         Gossiper.instance().start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
 
-        if (isBootstrapMode)
+        if (DatabaseDescriptor.isAutoBootstrap()
+            && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped()))
         {
             logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
             StorageLoadBalancer.instance().waitForLoadInfo();
             logger_.info("... got load info");
             Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance().getLoadInfo());
-            SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-            Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
-            new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), getLocalToken(), tokenMetadata_).startBootstrap(); // handles token update
+            startBootstrap(token);
             // don't finish startup (enabling thrift) until after bootstrap is done
             while (isBootstrapMode)
             {
@@ -303,13 +299,21 @@
         {
             SystemTable.setBootstrapped(true);
             Token token = storageMetadata_.getToken();
-            setToken(token);
+            tokenMetadata_.update(token, FBUtilities.getLocalAddress());
             Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
         }
 
         assert tokenMetadata_.sortedTokens().size() > 0;
     }
 
+    private void startBootstrap(Token token) throws IOException
+    {
+        isBootstrapMode = true;
+        SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
+        Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
+        new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles token update
+    }
+
     public boolean isBootstrapMode()
     {
         return isBootstrapMode;
@@ -935,7 +939,7 @@
         return tokens;
     }
 
-    public void decommission()
+    public void decommission() throws InterruptedException
     {
         if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
             throw new UnsupportedOperationException("local node is not a member of the token ring yet");
@@ -947,15 +951,22 @@
         logger_.info("DECOMMISSIONING");
         Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
         logger_.info("decommission sleeping " + Streaming.RING_DELAY);
-        try
-        {
-            Thread.sleep(Streaming.RING_DELAY);
-        }
-        catch (InterruptedException e)
+        Thread.sleep(Streaming.RING_DELAY);
+
+        Runnable finishLeaving = new Runnable()
         {
-            throw new AssertionError(e);
-        }
+            public void run()
+            {
+                Gossiper.instance().stop();
+                logger_.info("DECOMMISSION FINISHED.");
+                // let op be responsible for killing the process
+            }
+        };
+        unbootstrap(finishLeaving);
+    }
 
+    private void unbootstrap(final Runnable onFinish)
+    {
         Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(FBUtilities.getLocalAddress());
         if (logger_.isDebugEnabled())
             logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
@@ -970,7 +981,25 @@
                 {
                     pending.remove(entry);
                     if (pending.isEmpty())
-                        finishLeaving();
+                    {
+                        SystemTable.setBootstrapped(false);
+                        tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+                        replicationStrategy_.removeObsoletePendingRanges();
+
+                        if (logger_.isDebugEnabled())
+                            logger_.debug("");
+                        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
+                        try
+                        {
+                            Thread.sleep(2 * Gossiper.intervalInMillis_);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError(e);
+                        }
+
+                        onFinish.run();
+                    }
                 }
             };
             StageManager.getStage(streamStage_).execute(new Runnable()
@@ -984,6 +1013,35 @@
         }
     }
 
+    public void move(String newToken) throws InterruptedException
+    {
+        if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
+            throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+
+        final Token token = partitioner_.getTokenFactory().fromString(newToken); // make sure it's valid
+        logger_.info("moving to " + token);
+        Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+        logger_.info("move sleeping " + Streaming.RING_DELAY);
+        Thread.sleep(Streaming.RING_DELAY);
+
+        Runnable finishMoving = new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    logger_.info("re-bootstrapping to new token " + token);
+                    startBootstrap(token);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+        };
+        unbootstrap(finishMoving);
+    }
+
     public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int consistency_level)
     {
         return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
@@ -994,20 +1052,4 @@
         return replicationStrategy_;
     }
 
-    public void finishLeaving()
-    {
-        SystemTable.setBootstrapped(false);
-        Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
-        try
-        {
-            Thread.sleep(2 * Gossiper.intervalInMillis_);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        Gossiper.instance().stop();
-        logger_.info("DECOMMISSION FINISHED.");
-        // let op be responsible for killing the process
-    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Fri Nov 13 15:57:31 2009
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.FutureTask;
 
 import org.apache.cassandra.dht.Range;
 import java.net.InetAddress;
@@ -116,7 +117,13 @@
     /**
      * transfer this node's data to other machines and remove it from service.
      */
-    public void decommission();
+    public void decommission() throws InterruptedException;
+
+    /**
+     * @param newToken token to move this node to.
+     * This node will unload its data onto its neighbors, and bootstrap to the new token.
+     */
+    public void move(String newToken) throws InterruptedException;
 
     /** set the logging level at runtime */
     public void setLog4jLevel(String classQualifier, String level);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Fri Nov 13 15:57:31 2009
@@ -380,11 +380,16 @@
         ssProxy.clearSnapshot();
     }
 
-    public void decommission()
+    public void decommission() throws InterruptedException
     {
         ssProxy.decommission();
     }
 
+    public void move(String newToken) throws InterruptedException
+    {
+        ssProxy.move(newToken);
+    }
+
     /**
      * Print out the size of the queues in the thread pools
      *
@@ -480,7 +485,7 @@
         HelpFormatter hf = new HelpFormatter();
         String header = String.format(
                 "%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
-                "tpstats, flush, decommission, " +
+                "tpstats, flush, decommission, move, " +
                 " getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
         String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
         hf.printHelp(usage, "", options, header);
@@ -489,7 +494,7 @@
     /**
      * @param args
      */
-    public static void main(String[] args) throws IOException
+    public static void main(String[] args) throws IOException, InterruptedException
     {
         NodeProbe probe = null;
         try
@@ -543,6 +548,14 @@
         {
             probe.decommission();
         }
+        else if (cmdName.equals("move"))
+        {
+            if (arguments.length <= 1)
+            {
+                System.err.println("missing token argument");
+            }
+            probe.move(arguments[1]);
+        }
         else if (cmdName.equals("snapshot"))
         {
             String snapshotName = "";