You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/07 18:23:31 UTC
svn commit: r822792 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ dht/ io/
service/
Author: jbellis
Date: Wed Oct 7 16:23:31 2009
New Revision: 822792
URL: http://svn.apache.org/viewvc?rev=822792&view=rev
Log:
get token on bootstrap that gives us half of the keys from the most heavily-loaded node. (the "splits" approach should also be useful for #342; adding it to Thrift is trivial)
patch by jbellis; reviewed by Eric Evans for CASSANDRA-385
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Oct 7 16:23:31 2009
@@ -725,10 +725,7 @@
void doCleanup(SSTableReader sstable) throws IOException
{
assert sstable != null;
- List<Range> myRanges;
- Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
- myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
- List<SSTableReader> sstables = doFileAntiCompaction(Arrays.asList(sstable), myRanges, null);
+ List<SSTableReader> sstables = doFileAntiCompaction(Arrays.asList(sstable), StorageService.instance().getLocalRanges(), null);
if (!sstables.isEmpty())
{
assert sstables.size() == 1;
@@ -764,7 +761,7 @@
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
- int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), SSTableReader.getApproximateKeyCount(sstables) / 2);
+ int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
if (logger_.isDebugEnabled())
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -844,7 +841,8 @@
long startTime = System.currentTimeMillis();
long totalkeysWritten = 0;
- int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), SSTableReader.getApproximateKeyCount(sstables));
+ // TODO the int cast here is potentially buggy
+ int expectedBloomFilterSize = Math.max(SSTableReader.indexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
if (logger_.isDebugEnabled())
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DecoratedKey.java Wed Oct 7 16:23:31 2009
@@ -24,7 +24,7 @@
* Represents a decorated key, handy for certain operations
* where just working with strings gets slow.
*/
-public class DecoratedKey<T extends Token>
+public class DecoratedKey<T extends Token> implements Comparable<DecoratedKey>
{
public final T token;
public final String key;
@@ -32,6 +32,7 @@
public DecoratedKey(T token, String key)
{
super();
+ assert key != null;
this.token = token;
this.key = key;
}
@@ -55,20 +56,22 @@
return false;
if (getClass() != obj.getClass())
return false;
+
DecoratedKey other = (DecoratedKey) obj;
- if (key == null)
- {
- if (other.key != null)
- return false;
- } else if (!key.equals(other.key))
- return false;
+ // either both should be of a class where all tokens are null, or neither
+ assert (token == null) == (other.token == null);
if (token == null)
- {
- if (other.token != null)
- return false;
- } else if (!token.equals(other.token))
- return false;
- return true;
+ return key.equals(other.key);
+ return token.equals(other.token) && key.equals(other.key);
+ }
+
+ public int compareTo(DecoratedKey other)
+ {
+ assert (token == null) == (other.token == null);
+ if (token == null)
+ return key.compareTo(other.key);
+ int i = token.compareTo(other.token);
+ return i == 0 ? key.compareTo(other.key) : i;
}
@Override
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct 7 16:23:31 2009
@@ -492,7 +492,7 @@
}
/*
- * Get the list of all SSTables on disk. Not safe unless you aquire the CFS readlocks!
+ * Get the list of all SSTables on disk.
*/
public List<SSTableReader> getAllSSTablesOnDisk()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Oct 7 16:23:31 2009
@@ -41,8 +41,10 @@
*/
public class BootStrapper implements Runnable
{
+ public static final long INITIAL_DELAY = 30 * 1000; //ms
+
private static Logger logger_ = Logger.getLogger(BootStrapper.class);
- private static final long INITIAL_DELAY = 60*1000; //ms
+
/* endpoints that need to be bootstrapped */
protected EndPoint[] targets_ = new EndPoint[0];
/* tokens of the nodes being bootstrapped. */
@@ -60,10 +62,6 @@
{
try
{
- /* Initial delay waiting for this node to get a stable endpoint map */
- Thread.sleep(INITIAL_DELAY);
- /* Clone again now so we include all discovered nodes in our calculations */
- tokenMetadata_ = StorageService.instance().getTokenMetadata();
// Mark as not bootstrapping to calculate ranges correctly
for (int i=0; i< targets_.length; i++)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Wed Oct 7 16:23:31 2009
@@ -29,6 +29,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -86,9 +87,14 @@
return INDEX_INTERVAL;
}
- public static int getApproximateKeyCount(Iterable<SSTableReader> sstables)
+ public static long getApproximateKeyCount()
{
- int count = 0;
+ return getApproximateKeyCount(openedFiles.values());
+ }
+
+ public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
+ {
+ long count = 0;
for (SSTableReader sstable : sstables)
{
@@ -101,6 +107,30 @@
return count;
}
+ /**
+ * Get all indexed keys in any SSTable for our primary range
+ * TODO add option to include keys from one or more other ranges
+ */
+ public static List<DecoratedKey> getIndexedDecoratedKeys()
+ {
+ Range range = StorageService.instance().getLocalPrimaryRange();
+ List<DecoratedKey> indexedKeys = new ArrayList<DecoratedKey>();
+
+ for (SSTableReader sstable : openedFiles.values())
+ {
+ for (KeyPosition kp : sstable.getIndexPositions())
+ {
+ if (range.contains(kp.key.token))
+ {
+ indexedKeys.add(kp.key);
+ }
+ }
+ }
+ Collections.sort(indexedKeys);
+
+ return indexedKeys;
+ }
+
public static SSTableReader open(String dataFileName) throws IOException
{
return open(dataFileName, StorageService.getPartitioner());
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableWriter.java Wed Oct 7 16:23:31 2009
@@ -46,12 +46,12 @@
private DecoratedKey lastWrittenKey;
private BloomFilter bf;
- public SSTableWriter(String filename, int keyCount, IPartitioner partitioner) throws IOException
+ public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
{
super(filename, partitioner);
dataFile = new BufferedRandomAccessFile(path, "rw", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
indexFile = new BufferedRandomAccessFile(indexFilename(), "rw", (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024));
- bf = new BloomFilter(keyCount, 15);
+ bf = new BloomFilter((int)keyCount, 15); // TODO fix long -> int cast
}
private long beforeAppend(DecoratedKey decoratedKey) throws IOException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java Wed Oct 7 16:23:31 2009
@@ -77,8 +77,7 @@
});
// initialize stuff
- Set<String> tables = DatabaseDescriptor.getTableToColumnFamilyMap().keySet();
- for (String table : tables)
+ for (String table : DatabaseDescriptor.getTableToColumnFamilyMap().keySet())
{
if (logger.isDebugEnabled())
logger.debug("opening keyspace " + table);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed Oct 7 16:23:31 2009
@@ -525,10 +525,7 @@
{
return DatabaseDescriptor.getTables();
}
- else
- {
- return new ArrayList<String>();
- }
+ return Collections.emptyList();
}
public Map<String, Map<String, String>> describe_keyspace(String table) throws NotFoundException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=822792&r1=822791&r2=822792&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Oct 7 16:23:31 2009
@@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -27,8 +28,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -43,8 +43,18 @@
import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.TException;
+
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
+import org.apache.commons.lang.ArrayUtils;
/*
* This abstraction contains the token/identifier of this node
@@ -77,6 +87,7 @@
public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
+ public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
private static StorageService instance_;
private static EndPoint tcpAddr_;
@@ -97,6 +108,16 @@
return partitioner_;
}
+ public List<Range> getLocalRanges()
+ {
+ return getRangesForEndPoint(getLocalStorageEndPoint());
+ }
+
+ public Range getLocalPrimaryRange()
+ {
+ return getPrimaryRangeForEndPoint(getLocalStorageEndPoint());
+ }
+
static
{
partitioner_ = DatabaseDescriptor.getPartitioner();
@@ -254,6 +275,24 @@
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.rangeVerbHandler_, new RangeVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootstrapTokenVerbHandler_, new IVerbHandler()
+ {
+ public void doVerb(Message message)
+ {
+ List<String> tokens = getSplits(2);
+ assert tokens.size() == 3 : tokens.size();
+ Message response;
+ try
+ {
+ response = message.getReply(getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new AssertionError();
+ }
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+ }
+ });
/* register the stage for the mutations */
consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
@@ -309,12 +348,56 @@
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
if (isBootstrapMode)
{
- logger_.info("Starting in bootstrap mode");
+ logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+ // wait for node information to be available. if the rest of the cluster just came up,
+ // this could be up to threshold_ ms (currently 5 minutes).
+ try
+ {
+ while (storageLoadBalancer_.getLoadInfo().isEmpty())
+ {
+ Thread.sleep(100);
+ }
+ // one more sleep in case there are some stragglers
+ Thread.sleep(BootStrapper.INITIAL_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ // if initialtoken was specified, use that. otherwise, pick a token to assume half the load of the most-loaded node.
+ if (DatabaseDescriptor.getInitialToken() == null)
+ {
+ double maxLoad = 0;
+ EndPoint maxEndpoint = null;
+ for (Map.Entry<EndPoint,Double> entry : storageLoadBalancer_.getLoadInfo().entrySet())
+ {
+ if (maxEndpoint == null || entry.getValue() > maxLoad)
+ {
+ maxEndpoint = entry.getKey();
+ maxLoad = entry.getValue();
+ }
+ }
+ if (!maxEndpoint.equals(getLocalStorageEndPoint()))
+ {
+ Token<?> t = getBootstrapTokenFrom(maxEndpoint);
+ logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
+ updateToken(t);
+ }
+ }
doBootstrap(StorageService.getLocalStorageEndPoint());
Gossiper.instance().addApplicationState(BOOTSTRAP_MODE, new ApplicationState(""));
}
}
-
+
+ private Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
+ {
+ Message message = new Message(getLocalStorageEndPoint(), "", bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+ BootstrapTokenCallback btc = new BootstrapTokenCallback();
+ MessagingService.getMessagingInstance().sendRR(message, maxEndpoint, btc);
+ return btc.getToken();
+ }
+
public boolean isBootstrapMode()
{
return isBootstrapMode;
@@ -1058,4 +1141,68 @@
Logger.getLogger(classQualifier).setLevel(level);
logger_.info("set log level to " + level + " for classes under '" + classQualifier + "' (if the level doesn't look like '" + rawLevel + "' then log4j couldn't parse '" + rawLevel + "')");
}
+
+ /**
+ * @param splits: number of ranges to break into. Minimum 2.
+ * @return list of Tokens (_not_ keys!) breaking up the data this node is responsible for into `splits` pieces.
+ * There will be 1 more token than splits requested. So for splits of 2, tokens T1 T2 T3 will be returned,
+ * where (T1, T2] is the first range and (T2, T3] is the second. The first token will always be the left
+ * Token of this node's primary range, and the last will always be the Right token of that range.
+ */
+ public List<String> getSplits(int splits)
+ {
+ assert splits > 1;
+ // we use the actual Range token for the first and last brackets of the splits to ensure correctness
+ // (we're only operating on 1/128 of the keys remember)
+ Range range = getLocalPrimaryRange();
+ List<String> tokens = new ArrayList<String>();
+ tokens.add(range.left().toString());
+
+ List<DecoratedKey> decoratedKeys = SSTableReader.getIndexedDecoratedKeys();
+ for (int i = 1; i < splits; i++)
+ {
+ int index = i * (decoratedKeys.size() / splits);
+ tokens.add(decoratedKeys.get(index).token.toString());
+ }
+
+ tokens.add(range.right().toString());
+ return tokens;
+ }
+
+ class BootstrapTokenCallback implements IAsyncCallback
+ {
+ private volatile Token<?> token;
+ private final Condition condition = new SimpleCondition();
+
+ public Token<?> getToken()
+ {
+ try
+ {
+ condition.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return token;
+ }
+
+ public void response(Message msg)
+ {
+ try
+ {
+ token = partitioner_.getTokenFactory().fromString(new String(msg.getMessageBody(), "UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new AssertionError();
+ }
+ condition.signalAll();
+ }
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
}