You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/07 18:23:31 UTC

svn commit: r822792 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ dht/ io/ service/

Author: jbellis
Date: Wed Oct  7 16:23:31 2009
New Revision: 822792

URL: http://svn.apache.org/viewvc?rev=822792&view=rev
Log:
get token on bootstrap that gives us half of the keys from the most heavily-loaded node. (the "splits" approach should also be useful for #342; adding it to Thrift is trivial)
patch by jbellis; reviewed by Eric Evans for CASSANDRA-385

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Oct  7 16:23:31 2009
@@ -725,10 +725,7 @@
     void doCleanup(SSTableReader sstable) throws IOException
     {
         assert sstable != null;
-        List<Range> myRanges;
-        Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
-        myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
-        List<SSTableReader> sstables = doFileAntiCompaction(Arrays.asList(sstable), myRanges, null);
+        List<SSTableReader> sstables = doFileAntiCompaction(Arrays.asList(sstable), StorageService.instance().getLocalRanges(), null);
         if (!sstables.isEmpty())
         {
             assert sstables.size() == 1;
@@ -764,7 +761,7 @@
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
-        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), SSTableReader.getApproximateKeyCount(sstables) / 2);
+        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
@@ -844,7 +841,8 @@
         long startTime = System.currentTimeMillis();
         long totalkeysWritten = 0;
 
-        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), SSTableReader.getApproximateKeyCount(sstables));
+        // TODO the int cast here is potentially buggy
+        int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java Wed Oct  7 16:23:31 2009
@@ -24,7 +24,7 @@
  * Represents a decorated key, handy for certain operations
  * where just working with strings gets slow.
  */
-public class DecoratedKey<T extends Token>
+public class DecoratedKey<T extends Token> implements Comparable<DecoratedKey>
 {
     public final T token;
     public final String key;
@@ -32,6 +32,7 @@
     public DecoratedKey(T token, String key)
     {
         super();
+        assert key != null;
         this.token = token;
         this.key = key;
     }
@@ -55,20 +56,22 @@
             return false;
         if (getClass() != obj.getClass())
             return false;
+
         DecoratedKey other = (DecoratedKey) obj;
-        if (key == null)
-        {
-            if (other.key != null)
-                return false;
-        } else if (!key.equals(other.key))
-            return false;
+        // either both should be of a class where all tokens are null, or neither
+        assert (token == null) == (other.token == null);
         if (token == null)
-        {
-            if (other.token != null)
-                return false;
-        } else if (!token.equals(other.token))
-            return false;
-        return true;
+            return key.equals(other.key);
+        return token.equals(other.token) && key.equals(other.key);
+    }
+
+    public int compareTo(DecoratedKey other)
+    {
+        assert (token == null) == (other.token == null);
+        if (token == null)
+            return key.compareTo(other.key);
+        int i = token.compareTo(other.token);
+        return i == 0 ? key.compareTo(other.key) : i;
     }
 
     @Override

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct  7 16:23:31 2009
@@ -492,7 +492,7 @@
     }
 
     /*
-     * Get the list of all SSTables on disk.  Not safe unless you aquire the CFS readlocks!
+     * Get the list of all SSTables on disk.
     */
     public List<SSTableReader> getAllSSTablesOnDisk()
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Oct  7 16:23:31 2009
@@ -41,8 +41,10 @@
 */
 public class BootStrapper implements Runnable
 {
+    public static final long INITIAL_DELAY = 30 * 1000; //ms
+
     private static Logger logger_ = Logger.getLogger(BootStrapper.class);
-    private static final long INITIAL_DELAY = 60*1000; //ms
+
     /* endpoints that need to be bootstrapped */
     protected EndPoint[] targets_ = new EndPoint[0];
     /* tokens of the nodes being bootstrapped. */
@@ -60,10 +62,6 @@
     {
         try
         {
-            /* Initial delay waiting for this node to get a stable endpoint map */
-            Thread.sleep(INITIAL_DELAY);
-            /* Clone again now so we include all discovered nodes in our calculations */
-            tokenMetadata_ = StorageService.instance().getTokenMetadata();
             // Mark as not bootstrapping to calculate ranges correctly
             for (int i=0; i< targets_.length; i++)
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Wed Oct  7 16:23:31 2009
@@ -29,6 +29,7 @@
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -86,9 +87,14 @@
         return INDEX_INTERVAL;
     }
 
-    public static int getApproximateKeyCount(Iterable<SSTableReader> sstables)
+    public static long getApproximateKeyCount()
     {
-        int count = 0;
+        return getApproximateKeyCount(openedFiles.values());
+    }
+
+    public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
+    {
+        long count = 0;
 
         for (SSTableReader sstable : sstables)
         {
@@ -101,6 +107,30 @@
         return count;
     }
 
+    /**
+     * Get all indexed keys in any SSTable for our primary range
+     * TODO add option to include keys from one or more other ranges
+     */
+    public static List<DecoratedKey> getIndexedDecoratedKeys()
+    {
+        Range range = StorageService.instance().getLocalPrimaryRange();
+        List<DecoratedKey> indexedKeys = new ArrayList<DecoratedKey>();
+        
+        for (SSTableReader sstable : openedFiles.values())
+        {
+            for (KeyPosition kp : sstable.getIndexPositions())
+            {
+                if (range.contains(kp.key.token))
+                {
+                    indexedKeys.add(kp.key);
+                }
+            }
+        }
+        Collections.sort(indexedKeys);
+
+        return indexedKeys;
+    }
+
     public static SSTableReader open(String dataFileName) throws IOException
     {
         return open(dataFileName, StorageService.getPartitioner());

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Wed Oct  7 16:23:31 2009
@@ -46,12 +46,12 @@
     private DecoratedKey lastWrittenKey;
     private BloomFilter bf;
 
-    public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException
+    public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
     {
         super(filename, partitioner);
         dataFile = new BufferedRandomAccessFile(path, "rw", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
         indexFile = new BufferedRandomAccessFile(indexFilename(), "rw", (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024));
-        bf = new BloomFilter(keyCount, 15);
+        bf = new BloomFilter((int)keyCount, 15); // TODO fix long -> int cast
     }
 
     private long beforeAppend(DecoratedKey decoratedKey) throws IOException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java Wed Oct  7 16:23:31 2009
@@ -77,8 +77,7 @@
         });
 
         // initialize stuff
-        Set<String> tables = DatabaseDescriptor.getTableToColumnFamilyMap().keySet();
-        for (String table : tables)
+        for (String table : DatabaseDescriptor.getTableToColumnFamilyMap().keySet())
         {
             if (logger.isDebugEnabled())
                 logger.debug("opening keyspace " + table);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed Oct  7 16:23:31 2009
@@ -525,10 +525,7 @@
         {
             return DatabaseDescriptor.getTables();        
         }
-        else
-        {
-            return new ArrayList<String>();
-        }
+        return Collections.emptyList();
     }
 
     public Map<String, Map<String, String>> describe_keyspace(String table) throws NotFoundException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Oct  7 16:23:31 2009
@@ -20,6 +20,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -27,8 +28,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -43,8 +43,18 @@
 import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.TException;
+
 import org.apache.log4j.Logger;
 import org.apache.log4j.Level;
+import org.apache.commons.lang.ArrayUtils;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -77,6 +87,7 @@
     public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
     public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
+    public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
 
     private static StorageService instance_;
     private static EndPoint tcpAddr_;
@@ -97,6 +108,16 @@
         return partitioner_;
     }
 
+    public List<Range> getLocalRanges()
+    {
+        return getRangesForEndPoint(getLocalStorageEndPoint());
+    }
+
+    public Range getLocalPrimaryRange()
+    {
+        return getPrimaryRangeForEndPoint(getLocalStorageEndPoint());
+    }
+
     static
     {
         partitioner_ = DatabaseDescriptor.getPartitioner();
@@ -254,6 +275,24 @@
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );        
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_, new RangeVerbHandler());
+        MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootstrapTokenVerbHandler_, new IVerbHandler()
+        {
+            public void doVerb(Message message)
+            {
+                List<String> tokens = getSplits(2);
+                assert tokens.size() == 3 : tokens.size();
+                Message response;
+                try
+                {
+                    response = message.getReply(getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
+                }
+                catch (UnsupportedEncodingException e)
+                {
+                    throw new AssertionError();
+                }
+                MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+            }
+        });
         
         /* register the stage for the mutations */
         consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
@@ -309,12 +348,56 @@
         Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
         if (isBootstrapMode)
         {
-            logger_.info("Starting in bootstrap mode");
+            logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+            // wait for node information to be available.  if the rest of the cluster just came up,
+            // this could be up to threshold_ ms (currently 5 minutes).
+            try
+            {
+                while (storageLoadBalancer_.getLoadInfo().isEmpty())
+                {
+                    Thread.sleep(100);
+                }
+                // one more sleep in case there are some stragglers
+                Thread.sleep(BootStrapper.INITIAL_DELAY);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            // if initialtoken was specified, use that.  otherwise, pick a token to assume half the load of the most-loaded node.
+            if (DatabaseDescriptor.getInitialToken() == null)
+            {
+                double maxLoad = 0;
+                EndPoint maxEndpoint = null;
+                for (Map.Entry<EndPoint,Double> entry : storageLoadBalancer_.getLoadInfo().entrySet())
+                {
+                    if (maxEndpoint == null || entry.getValue() > maxLoad)
+                    {
+                        maxEndpoint = entry.getKey();
+                        maxLoad = entry.getValue();
+                    }
+                }
+                if (!maxEndpoint.equals(getLocalStorageEndPoint()))
+                {
+                    Token<?> t = getBootstrapTokenFrom(maxEndpoint);
+                    logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
+                    updateToken(t);
+                }
+            }
             doBootstrap(StorageService.getLocalStorageEndPoint());
             Gossiper.instance().addApplicationState(BOOTSTRAP_MODE, new ApplicationState(""));
         }
     }
-    
+
+    private Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
+    {
+        Message message = new Message(getLocalStorageEndPoint(), "", bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+        BootstrapTokenCallback btc = new BootstrapTokenCallback();
+        MessagingService.getMessagingInstance().sendRR(message, maxEndpoint, btc);
+        return btc.getToken();
+    }
+
     public boolean isBootstrapMode()
     {
         return isBootstrapMode;
@@ -1058,4 +1141,68 @@
         Logger.getLogger(classQualifier).setLevel(level);
         logger_.info("set log level to " + level + " for classes under '" + classQualifier + "' (if the level doesn't look like '" + rawLevel + "' then log4j couldn't parse '" + rawLevel + "')");
     }
+
+    /**
+     * @param splits: number of ranges to break into. Minimum 2.
+     * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible for into `splits` pieces.
+     * There will be 1 more token than splits requested.  So for splits of 2, tokens T1 T2 T3 will be returned,
+     * where (T1, T2] is the first range and (T2, T3] is the second.  The first token will always be the left
+     * Token of this node's primary range, and the last will always be the Right token of that range.
+     */ 
+    public List<String> getSplits(int splits)
+    {
+        assert splits > 1;
+        // we use the actual Range token for the first and last brackets of the splits to ensure correctness
+        // (we're only operating on 1/128 of the keys remember)
+        Range range = getLocalPrimaryRange();
+        List<String> tokens = new ArrayList<String>();
+        tokens.add(range.left().toString());
+
+        List<DecoratedKey> decoratedKeys = SSTableReader.getIndexedDecoratedKeys();
+        for (int i = 1; i < splits; i++)
+        {
+            int index = i * (decoratedKeys.size() / splits);
+            tokens.add(decoratedKeys.get(index).token.toString());
+        }
+
+        tokens.add(range.right().toString());
+        return tokens;
+    }
+
+    class BootstrapTokenCallback implements IAsyncCallback
+    {
+        private volatile Token<?> token;
+        private final Condition condition = new SimpleCondition();
+
+        public Token<?> getToken()
+        {
+            try
+            {
+                condition.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return token;
+        }
+
+        public void response(Message msg)
+        {
+            try
+            {
+                token = partitioner_.getTokenFactory().fromString(new String(msg.getMessageBody(), "UTF-8"));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                throw new AssertionError();
+            }
+            condition.signalAll();
+        }
+
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
 }