You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/09 19:07:15 UTC
svn commit: r823616 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/
src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/service/
test/unit/org/apache/cassandra/db/ test/unit/org/apache/...
Author: jbellis
Date: Fri Oct 9 17:07:14 2009
New Revision: 823616
URL: http://svn.apache.org/viewvc?rev=823616&view=rev
Log:
move bootstrap-related code into Bootstrapper
patch by jbellis; reviewed by Eric Evans for CASSANDRA-477
Added:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
- copied, changed from r823562, incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
Removed:
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Oct 9 17:07:14 2009
@@ -340,7 +340,7 @@
return new File(DatabaseDescriptor.getDataFileLocationForTable(table_), fname).getAbsolutePath();
}
- String getTempSSTableFileName()
+ public String getTempSSTableFileName()
{
return String.format("%s-%s-%s-Data.db",
columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
@@ -559,7 +559,7 @@
* param @ filename - filename just flushed to disk
* param @ bf - bloom filter which indicates the keys that are in this file.
*/
- void addSSTable(SSTableReader sstable)
+ public void addSSTable(SSTableReader sstable)
{
ssTables_.add(sstable);
CompactionManager.instance().submit(this);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Oct 9 17:07:14 2009
@@ -26,14 +26,11 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.BootstrapInitiateMessage;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.io.IStreamComplete;
@@ -155,163 +152,6 @@
return FBUtilities.mapToString(tableMetadataMap_);
}
}
-
- /**
- * This is the callback handler that is invoked when we have
- * completely been bootstrapped for a single file by a remote host.
- *
- * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
- */
- public static class BootstrapCompletionHandler implements IStreamComplete
- {
- public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
- {
- /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
- if (streamContext.getTargetFile().contains("-Data.db"))
- {
- String tableName = streamContext.getTable();
- File file = new File( streamContext.getTargetFile() );
- String fileName = file.getName();
- String [] temp = fileName.split("-");
-
- //Open the file to see if all parts are now here
- SSTableReader sstable = null;
- try
- {
- sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
-
- //TODO add a sanity check that this sstable has all its parts and is ok
- Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
- logger_.info("Bootstrap added " + sstable.getFilename());
- }
- catch (IOException e)
- {
- logger_.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
- }
- }
-
- EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
- /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
- StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
- Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
- MessagingService.instance().sendOneWay(message, to);
- /* If we're done with everything for this host, remove from bootstrap sources */
- if (StreamContextManager.isDone(to.getHost()))
- StorageService.instance().removeBootstrapSource(to);
- }
- }
-
- public static class BootStrapInitiateVerbHandler implements IVerbHandler
- {
- /*
- * Here we handle the BootstrapInitiateMessage. Here we get the
- * array of StreamContexts. We get file names for the column
- * families associated with the files and replace them with the
- * file names as obtained from the column family store on the
- * receiving end.
- */
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
-
- try
- {
- BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
- StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
-
- Map<String, String> fileNames = getNewNames(streamContexts);
- /*
- * For each of stream context's in the incoming message
- * generate the new file names and store the new file names
- * in the StreamContextManager.
- */
- for (StreamContextManager.StreamContext streamContext : streamContexts )
- {
- StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
- String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
-
- //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
- if (logger_.isDebugEnabled())
- logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
- streamContext.setTargetFile(file);
- addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
- }
-
- StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new Table.BootstrapCompletionHandler());
- /* Send a bootstrap initiation done message to execute on default stage. */
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap initiate done message ...");
- Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
- MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
- }
- catch ( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
-
- String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
- StreamContextManager.StreamContext streamContext)
- {
- File sourceFile = new File( streamContext.getTargetFile() );
- String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
- String cfName = piece[0];
- String ssTableNum = piece[1];
- String typeOfFile = piece[2];
-
- String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
- //Drop type (Data.db) from new FileName
- String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
- String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName ;
- return file;
- }
-
- Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
- {
- /*
- * Mapping for each file with unique CF-i ---> new file name. For eg.
- * for a file with name <CF>-<i>-Data.db there is a corresponding
- * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
- * generated file name.
- */
- Map<String, String> fileNames = new HashMap<String, String>();
- /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
- Set<String> distinctEntries = new HashSet<String>();
- for ( StreamContextManager.StreamContext streamContext : streamContexts )
- {
- String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
- distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
- }
-
- /* Generate unique file names per entry */
- for ( String distinctEntry : distinctEntries )
- {
- String tableName;
- String[] peices = FBUtilities.strip(distinctEntry, "-");
- tableName = peices[0];
- Table table = Table.open( tableName );
- Map<String, ColumnFamilyStore> columnFamilyStores = table.getColumnFamilyStores();
-
- ColumnFamilyStore cfStore = columnFamilyStores.get(peices[1]);
- if (logger_.isDebugEnabled())
- logger_.debug("Generating file name for " + distinctEntry + " ...");
- fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
- }
-
- return fileNames;
- }
-
- private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
- StreamContextManager.addStreamContext(host, streamContext, streamStatus);
- }
- }
/* Used to lock the factory for creation of Table instance */
private static final Lock createLock_ = new ReentrantLock();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Oct 9 17:07:14 2009
@@ -26,24 +26,54 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+ import java.util.concurrent.locks.Condition;
+ import java.util.concurrent.ExecutorService;
+ import java.io.IOException;
+ import java.io.UnsupportedEncodingException;
+ import java.io.File;
import org.apache.log4j.Logger;
+ import org.apache.commons.lang.ArrayUtils;
+
import org.apache.cassandra.locator.TokenMetadata;
- import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.net.*;
+ import org.apache.cassandra.net.io.StreamContextManager;
+ import org.apache.cassandra.net.io.IStreamComplete;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-
-
-/**
- * This class handles the bootstrapping responsibilities for
- * any new endpoint.
-*/
+ import org.apache.cassandra.service.StorageLoadBalancer;
+ import org.apache.cassandra.service.StreamManager;
+ import org.apache.cassandra.utils.LogUtil;
+ import org.apache.cassandra.utils.SimpleCondition;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.gms.Gossiper;
+ import org.apache.cassandra.gms.ApplicationState;
+ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+ import org.apache.cassandra.io.DataInputBuffer;
+ import org.apache.cassandra.io.SSTableReader;
+ import org.apache.cassandra.io.SSTableWriter;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Table;
+
+
+ /**
+ * This class handles the bootstrapping responsibilities for the local endpoint.
+ *
+ * - bootstrapTokenVerb asks the most-loaded node what Token to use to split its Range in two.
+ * - bootstrapMetadataVerb tells source nodes to send us the necessary Ranges
+ * - source nodes send bootStrapInitiateVerb to us to say "get ready to receive data" [if there is data to send]
+ * - when we have everything set up to receive the data, we send bootStrapInitiateDoneVerb back to the source nodes and they start streaming
+ * - when streaming is complete, we send bootStrapTerminateVerb to the source so it can clean up on its end
+ */
public class BootStrapper implements Runnable
{
public static final long INITIAL_DELAY = 30 * 1000; //ms
- private static Logger logger_ = Logger.getLogger(BootStrapper.class);
+ static final Logger logger_ = Logger.getLogger(BootStrapper.class);
+
+ /* This thread pool is used to do the bootstrap for a new node */
+ private static final ExecutorService bootstrapExecutor_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
/* endpoints that need to be bootstrapped */
protected EndPoint[] targets_ = new EndPoint[0];
@@ -141,4 +171,312 @@
return rangesWithSourceTarget;
}
+ private static Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
+ {
+ Message message = new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
+ BootstrapTokenCallback btc = new BootstrapTokenCallback();
+ MessagingService.instance().sendRR(message, maxEndpoint, btc);
+ return btc.getToken();
+ }
+
+ public static void startBootstrap() throws IOException
+ {
+ logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+
+ StorageService ss = StorageService.instance();
+ StorageLoadBalancer slb = StorageLoadBalancer.instance();
+
+ slb.waitForLoadInfo();
+
+ // if initialtoken was specified, use that. otherwise, pick a token to assume half the load of the most-loaded node.
+ if (DatabaseDescriptor.getInitialToken() == null)
+ {
+ double maxLoad = 0;
+ EndPoint maxEndpoint = null;
+ for (Map.Entry<EndPoint,Double> entry : slb.getLoadInfo().entrySet())
+ {
+ if (maxEndpoint == null || entry.getValue() > maxLoad)
+ {
+ maxEndpoint = entry.getKey();
+ maxLoad = entry.getValue();
+ }
+ }
+ if (maxEndpoint == null)
+ {
+ throw new RuntimeException("No bootstrap sources found");
+ }
+
+ if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
+ {
+ Token<?> t = getBootstrapTokenFrom(maxEndpoint);
+ logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
+ ss.updateToken(t);
+ }
+ }
+
+ BootStrapper bs = new BootStrapper(new EndPoint[] {StorageService.getLocalStorageEndPoint()}, ss.getLocalToken());
+ bootstrapExecutor_.submit(bs);
+ Gossiper.instance().addApplicationState(StorageService.BOOTSTRAP_MODE, new ApplicationState(""));
+ }
+
+ public static class BootstrapTokenVerbHandler implements IVerbHandler
+ {
+ public void doVerb(Message message)
+ {
+ StorageService ss = StorageService.instance();
+ List<String> tokens = ss.getSplits(2);
+ assert tokens.size() == 3 : tokens.size();
+ Message response;
+ try
+ {
+ response = message.getReply(ss.getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new AssertionError();
+ }
+ MessagingService.instance().sendOneWay(response, message.getFrom());
+ }
+ }
+
+ public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
+ {
+ public void doVerb(Message message)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received a bootstrap initiate done message ...");
+ /* Let the Stream Manager do his thing. */
+ StreamManager.instance(message.getFrom()).start();
+ }
+ }
+
+ private static class BootstrapTokenCallback implements IAsyncCallback
+ {
+ private volatile Token<?> token;
+ private final Condition condition = new SimpleCondition();
+
+ public Token<?> getToken()
+ {
+ try
+ {
+ condition.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return token;
+ }
+
+ public void response(Message msg)
+ {
+ try
+ {
+ token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), "UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new AssertionError();
+ }
+ condition.signalAll();
+ }
+
+ public void attachContext(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class BootStrapInitiateVerbHandler implements IVerbHandler
+ {
+ /*
+ * Here we handle the BootstrapInitiateMessage. Here we get the
+ * array of StreamContexts. We get file names for the column
+ * families associated with the files and replace them with the
+ * file names as obtained from the column family store on the
+ * receiving end.
+ */
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+
+ try
+ {
+ BootstrapInitiateMessage biMsg = BootstrapInitiateMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamContext[] streamContexts = biMsg.getStreamContext();
+
+ Map<String, String> fileNames = getNewNames(streamContexts);
+ /*
+ * For each of stream context's in the incoming message
+ * generate the new file names and store the new file names
+ * in the StreamContextManager.
+ */
+ for (StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
+ String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
+
+ //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+ streamContext.setTargetFile(file);
+ addStreamContext(message.getFrom().getHost(), streamContext, streamStatus);
+ }
+
+ StreamContextManager.registerStreamCompletionHandler(message.getFrom().getHost(), new BootstrapCompletionHandler());
+ /* Send a bootstrap initiation done message to execute on default stage. */
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending a bootstrap initiate done message ...");
+ Message doneMessage = new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
+ MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+
+ String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames,
+ StreamContextManager.StreamContext streamContext)
+ {
+ File sourceFile = new File( streamContext.getTargetFile() );
+ String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
+ String cfName = piece[0];
+ String ssTableNum = piece[1];
+ String typeOfFile = piece[2];
+
+ String newFileNameExpanded = fileNames.get( streamContext.getTable() + "-" + cfName + "-" + ssTableNum );
+ //Drop type (Data.db) from new FileName
+ String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
+ return DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName;
+ }
+
+ Map<String, String> getNewNames(StreamContextManager.StreamContext[] streamContexts) throws IOException
+ {
+ /*
+ * Mapping for each file with unique CF-i ---> new file name. For eg.
+ * for a file with name <CF>-<i>-Data.db there is a corresponding
+ * <CF>-<i>-Index.db. We maintain a mapping from <CF>-<i> to a newly
+ * generated file name.
+ */
+ Map<String, String> fileNames = new HashMap<String, String>();
+ /* Get the distinct entries from StreamContexts i.e have one entry per Data/Index/Filter file set */
+ Set<String> distinctEntries = new HashSet<String>();
+ for ( StreamContextManager.StreamContext streamContext : streamContexts )
+ {
+ String[] pieces = FBUtilities.strip(new File(streamContext.getTargetFile()).getName(), "-");
+ distinctEntries.add(streamContext.getTable() + "-" + pieces[0] + "-" + pieces[1] );
+ }
+
+ /* Generate unique file names per entry */
+ for ( String distinctEntry : distinctEntries )
+ {
+ String tableName;
+ String[] pieces = FBUtilities.strip(distinctEntry, "-");
+ tableName = pieces[0];
+ Table table = Table.open( tableName );
+
+ ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
+ if (logger_.isDebugEnabled())
+ logger_.debug("Generating file name for " + distinctEntry + " ...");
+ fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
+ }
+
+ return fileNames;
+ }
+
+ private void addStreamContext(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
+ StreamContextManager.addStreamContext(host, streamContext, streamStatus);
+ }
+ }
+
+ /**
+ * This is the callback handler that is invoked when we have
+ * completely been bootstrapped for a single file by a remote host.
+ *
+ * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
+ */
+ private static class BootstrapCompletionHandler implements IStreamComplete
+ {
+ public void onStreamCompletion(String host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus) throws IOException
+ {
+ /* Parse the stream context and the file to the list of SSTables in the associated Column Family Store. */
+ if (streamContext.getTargetFile().contains("-Data.db"))
+ {
+ String tableName = streamContext.getTable();
+ File file = new File( streamContext.getTargetFile() );
+ String fileName = file.getName();
+ String [] temp = fileName.split("-");
+
+ //Open the file to see if all parts are now here
+ SSTableReader sstable = null;
+ try
+ {
+ sstable = SSTableWriter.renameAndOpen(streamContext.getTargetFile());
+
+ //TODO add a sanity check that this sstable has all its parts and is ok
+ Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
+ logger_.info("Bootstrap added " + sstable.getFilename());
+ }
+ catch (IOException e)
+ {
+ logger_.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
+ }
+ }
+
+ EndPoint to = new EndPoint(host, DatabaseDescriptor.getStoragePort());
+ if (logger_.isDebugEnabled())
+ logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + to);
+ /* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
+ StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
+ Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
+ MessagingService.instance().sendOneWay(message, to);
+ /* If we're done with everything for this host, remove from bootstrap sources */
+ if (StreamContextManager.isDone(to.getHost()))
+ StorageService.instance().removeBootstrapSource(to);
+ }
+ }
+
+ public static class BootstrapTerminateVerbHandler implements IVerbHandler
+ {
+ private static Logger logger_ = Logger.getLogger( BootstrapTerminateVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ byte[] body = message.getMessageBody();
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+
+ try
+ {
+ StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
+ StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
+
+ switch( streamStatus.getAction() )
+ {
+ case DELETE:
+ StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
+ break;
+
+ case STREAM:
+ if (logger_.isDebugEnabled())
+ logger_.debug("Need to re-stream file " + streamStatus.getFile());
+ StreamManager.instance(message.getFrom()).repeat();
+ break;
+
+ default:
+ break;
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info(LogUtil.throwableToString(ex));
+ }
+ }
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataMessage.java Fri Oct 9 17:07:14 2009
@@ -53,7 +53,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream( bos );
BootstrapMetadataMessage.serializer().serialize(bsMetadataMessage, dos);
- return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bsMetadataVerbHandler_, bos.toByteArray() );
+ return new Message( StorageService.getLocalStorageEndPoint(), "", StorageService.bootstrapMetadataVerbHandler_, bos.toByteArray() );
}
protected BootstrapMetadata[] bsMetadata_ = new BootstrapMetadata[0];
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Fri Oct 9 17:07:14 2009
@@ -164,18 +164,6 @@
sendMessagesToBootstrapSources(rangeInfo);
}
- // TODO: Once we're sure we don't need global bootstrap -- clean this code up
- // so it is easier to understand what messages are being sent. Local bootstrap should
- // look much simpler
- protected static void assignWorkForLocalBootstrap(Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget) throws IOException
- {
- Map<EndPoint, Map<EndPoint, List<Range>>> rangeInfo = getWorkMap(rangesWithSourceTarget);
- Map<EndPoint, Map<EndPoint, List<Range>>> filteredRanges = filterRangesForTargetEndPoint(rangeInfo,
- StorageService.getLocalStorageEndPoint());
- sendMessagesToBootstrapSources(filteredRanges);
- }
-
-
/**
* This method takes the Src -> (Tgt-> List of ranges) maps and retains those entries
* that are relevant to bootstrapping the target endpoint
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Oct 9 17:07:14 2009
@@ -130,8 +130,7 @@
/* map where key is the endpoint and value is the state associated with the endpoint */
Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
- /* private CTOR */
- Gossiper()
+ private Gossiper()
{
aVeryLongTime_ = 259200 * 1000;
/* register with the Failure Detector for receiving Failure detector events */
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Fri Oct 9 17:07:14 2009
@@ -29,6 +29,7 @@
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.Gossiper;
@@ -48,7 +49,7 @@
* keys at an Endpoint. Monitor load information for a 5 minute
* interval and then do load balancing operations if necessary.
*/
-final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
+public final class StorageLoadBalancer implements IEndPointStateChangeSubscriber
{
class LoadBalancer implements Runnable
{
@@ -70,14 +71,14 @@
/*
int threshold = (int)(StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad());
int myLoad = localLoad();
- EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+ EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
if (logger_.isDebugEnabled())
logger_.debug("Trying to relocate the predecessor " + predecessor);
boolean value = tryThisNode(myLoad, threshold, predecessor);
if ( !value )
{
loadInfo2_.remove(predecessor);
- EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
if (logger_.isDebugEnabled())
logger_.debug("Trying to relocate the successor " + successor);
value = tryThisNode(myLoad, threshold, successor);
@@ -165,6 +166,16 @@
}
}
+ private static final long BROADCAST_INTERVAL = 5 * 60 * 1000L;
+
+ private static StorageLoadBalancer instance_;
+
+ public static synchronized StorageLoadBalancer instance()
+ {
+ return instance_ == null ? (instance_ = new StorageLoadBalancer()) : instance_;
+ }
+
+
private static final Logger logger_ = Logger.getLogger(StorageLoadBalancer.class);
private static final String lbStage_ = "LOAD-BALANCER-STAGE";
private static final String moveMessageVerbHandler_ = "MOVE-MESSAGE-VERB-HANDLER";
@@ -173,7 +184,6 @@
/* If a node's load is this factor more than the average, it is considered Heavy */
private static final double TOPHEAVY_RATIO = 1.5;
- private StorageService storageService_;
/* this indicates whether this node is already helping someone else */
private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
private Map<EndPoint, Double> loadInfo_ = new HashMap<EndPoint, Double>();
@@ -184,18 +194,13 @@
/* This thread pool is used by target node to leave the ring. */
private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");
- StorageLoadBalancer(StorageService storageService)
+ /* Timer is used to disseminate load information */
+ private Timer loadTimer_ = new Timer(false);
+
+ private StorageLoadBalancer()
{
- storageService_ = storageService;
- /* register the load balancer stage */
StageManager.registerStage(StorageLoadBalancer.lbStage_, new SingleThreadedStage(StorageLoadBalancer.lbStage_));
- /* register the load balancer verb handler */
MessagingService.instance().registerVerbHandlers(StorageLoadBalancer.moveMessageVerbHandler_, new MoveMessageVerbHandler());
- }
-
- public void start()
- {
- /* Register with the Gossiper for EndPointState notifications */
Gossiper.instance().register(this);
}
@@ -227,7 +232,7 @@
if ( !isMoveable_.get() )
return false;
int myload = localLoad();
- EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
LoadInfo li = loadInfo2_.get(successor);
// "load" is NULL means that the successor node has not
// yet gossiped its load information. We should return
@@ -290,7 +295,7 @@
}
else
{
- EndPoint successor = storageService_.getSuccessor(target);
+ EndPoint successor = StorageService.instance().getSuccessor(target);
double sLoad = loadInfo2_.get(successor);
double targetLoad = loadInfo2_.get(target);
return (sLoad + targetLoad) <= threshold;
@@ -299,11 +304,11 @@
private boolean isANeighbour(EndPoint neighbour)
{
- EndPoint predecessor = storageService_.getPredecessor(StorageService.getLocalStorageEndPoint());
+ EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
if ( predecessor.equals(neighbour) )
return true;
- EndPoint successor = storageService_.getSuccessor(StorageService.getLocalStorageEndPoint());
+ EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
if ( successor.equals(neighbour) )
return true;
@@ -343,6 +348,31 @@
{
return loadInfo_;
}
+
+ public void startBroadcasting()
+ {
+ /* starts a load timer thread */
+ loadTimer_.schedule(new LoadDisseminator(), BROADCAST_INTERVAL, BROADCAST_INTERVAL);
+ }
+
+ /** wait for node information to be available. if the rest of the cluster just came up,
+ this could be up to threshold_ ms (currently 5 minutes). */
+ public void waitForLoadInfo()
+ {
+ try
+ {
+ while (loadInfo_.isEmpty())
+ {
+ Thread.sleep(100);
+ }
+ // one more sleep in case there are some stragglers
+ Thread.sleep(BootStrapper.INITIAL_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
}
class MoveMessage implements Serializable
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Oct 9 17:07:14 2009
@@ -22,15 +22,11 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
+import javax.management.*;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -43,12 +39,10 @@
import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.SimpleCondition;
import org.apache.cassandra.io.SSTableReader;
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
-import org.apache.commons.lang.ArrayUtils;
/*
* This abstraction contains the token/identifier of this node
@@ -60,9 +54,7 @@
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
private final static String nodeId_ = "NODE-IDENTIFIER";
- private final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
- /* Gossip load after every 5 mins. */
- private static final long threshold_ = 5 * 60 * 1000L;
+ public final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
/* All stage identifiers */
public final static String mutationStage_ = "ROW-MUTATION-STAGE";
@@ -79,14 +71,16 @@
public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
- public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
+ public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
- private static volatile StorageService instance_;
private static EndPoint tcpAddr_;
private static EndPoint udpAddr_;
- private static IPartitioner partitioner_;
+ private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
+
+
+ private static volatile StorageService instance_;
public static EndPoint getLocalStorageEndPoint()
{
@@ -112,25 +106,6 @@
return getPrimaryRangeForEndPoint(getLocalStorageEndPoint());
}
- static
- {
- partitioner_ = DatabaseDescriptor.getPartitioner();
- }
-
-
- public static class BootstrapInitiateDoneVerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger( BootstrapInitiateDoneVerbHandler.class );
-
- public void doVerb(Message message)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Received a bootstrap initiate done message ...");
- /* Let the Stream Manager do his thing. */
- StreamManager.instance(message.getFrom()).start();
- }
- }
-
/*
* Factory method that gets an instance of the StorageService
* class.
@@ -170,17 +145,14 @@
private TokenMetadata tokenMetadata_ = new TokenMetadata();
private SystemTable.StorageMetadata storageMetadata_;
- /* Timer is used to disseminate load information */
- private Timer loadTimer_ = new Timer(false);
-
- /* This thread pool is used to do the bootstrap for a new node */
- private ExecutorService bootStrapper_ = new DebuggableThreadPoolExecutor("BOOT-STRAPPER");
-
/* This thread pool does consistency checks when the client doesn't care about consistency */
- private ExecutorService consistencyManager_;
+ private ExecutorService consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
+ DatabaseDescriptor.getConsistencyThreads(),
+ Integer.MAX_VALUE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("CONSISTENCY-MANAGER"));
- /* This is the entity that tracks load information of all nodes in the cluster */
- private StorageLoadBalancer storageLoadBalancer_;
/* We use this interface to determine where replicas need to be placed */
private AbstractReplicationStrategy nodePicker_;
/* Are we starting this node in bootstrap mode? */
@@ -229,28 +201,19 @@
}
}
- /*
- * Registers with Management Server
- */
- private void init()
+ public StorageService()
{
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- mbs.registerMBean(this, new ObjectName(
- "org.apache.cassandra.service:type=StorageService"));
+ mbs.registerMBean(this, new ObjectName("org.apache.cassandra.service:type=StorageService"));
}
catch (Exception e)
{
- logger_.error(LogUtil.throwableToString(e));
+ throw new RuntimeException(e);
}
- }
- public StorageService()
- {
bootstrapSet = new HashSet<EndPoint>();
- init();
- storageLoadBalancer_ = new StorageLoadBalancer(this);
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
@@ -259,37 +222,15 @@
MessagingService.instance().registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new Table.BootStrapInitiateVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_, new DataFileVerbHandler() );
MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
- MessagingService.instance().registerVerbHandlers(bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new IVerbHandler()
- {
- public void doVerb(Message message)
- {
- List<String> tokens = getSplits(2);
- assert tokens.size() == 3 : tokens.size();
- Message response;
- try
- {
- response = message.getReply(getLocalStorageEndPoint(), tokens.get(1).getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- throw new AssertionError();
- }
- MessagingService.instance().sendOneWay(response, message.getFrom());
- }
- });
-
- /* register the stage for the mutations */
- consistencyManager_ = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getConsistencyThreads(),
- DatabaseDescriptor.getConsistencyThreads(),
- Integer.MAX_VALUE, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CONSISTENCY-MANAGER"));
+ // see BootStrapper for a summary of how the bootstrap verbs interact
+ MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
+ MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new BootStrapper.BootStrapInitiateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new BootStrapper.BootstrapInitiateDoneVerbHandler());
+ MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new BootStrapper.BootstrapTerminateVerbHandler());
+ MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
StageManager.registerStage(StorageService.mutationStage_,
new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
@@ -323,55 +264,13 @@
SelectorManager.getSelectorManager().start();
SelectorManager.getUdpSelectorManager().start();
- /* starts a load timer thread */
- loadTimer_.schedule( new LoadDisseminator(), StorageService.threshold_, StorageService.threshold_);
-
+ StorageLoadBalancer.instance().startBroadcasting();
+
if (isBootstrapMode)
{
- logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
- // wait for node information to be available. if the rest of the cluster just came up,
- // this could be up to threshold_ ms (currently 5 minutes).
- try
- {
- while (storageLoadBalancer_.getLoadInfo().isEmpty())
- {
- Thread.sleep(100);
- }
- // one more sleep in case there are some stragglers
- Thread.sleep(BootStrapper.INITIAL_DELAY);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
- // if initialtoken was specified, use that. otherwise, pick a token to assume half the load of the most-loaded node.
- if (DatabaseDescriptor.getInitialToken() == null)
- {
- double maxLoad = 0;
- EndPoint maxEndpoint = null;
- for (Map.Entry<EndPoint,Double> entry : storageLoadBalancer_.getLoadInfo().entrySet())
- {
- if (maxEndpoint == null || entry.getValue() > maxLoad)
- {
- maxEndpoint = entry.getKey();
- maxLoad = entry.getValue();
- }
- }
- if (!maxEndpoint.equals(getLocalStorageEndPoint()))
- {
- Token<?> t = getBootstrapTokenFrom(maxEndpoint);
- logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
- updateToken(t);
- }
- }
-
- BootStrapper bs = new BootStrapper(new EndPoint[] {getLocalStorageEndPoint()}, storageMetadata_.getToken());
- bootStrapper_.submit(bs);
- Gossiper.instance().addApplicationState(BOOTSTRAP_MODE, new ApplicationState(""));
+ BootStrapper.startBootstrap();
}
- storageLoadBalancer_.start();
Gossiper.instance().register(this);
Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
/* Make sure this token gets gossiped around. */
@@ -380,14 +279,6 @@
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
}
- private Token<?> getBootstrapTokenFrom(EndPoint maxEndpoint)
- {
- Message message = new Message(getLocalStorageEndPoint(), "", bootstrapTokenVerbHandler_, ArrayUtils.EMPTY_BYTE_ARRAY);
- BootstrapTokenCallback btc = new BootstrapTokenCallback();
- MessagingService.instance().sendRR(message, maxEndpoint, btc);
- return btc.getToken();
- }
-
public boolean isBootstrapMode()
{
return isBootstrapMode;
@@ -587,7 +478,7 @@
public Map<String, String> getLoadMap()
{
Map<String, String> map = new HashMap<String, String>();
- for (Map.Entry<EndPoint,Double> entry : storageLoadBalancer_.getLoadInfo().entrySet())
+ for (Map.Entry<EndPoint,Double> entry : StorageLoadBalancer.instance().getLoadInfo().entrySet())
{
map.put(entry.getKey().getHost(), FileUtils.stringifyFileSize(entry.getValue()));
}
@@ -665,6 +556,11 @@
HintedHandOffManager.instance().deliverHints(endpoint);
}
+ public Token getLocalToken()
+ {
+ return tokenMetadata_.getToken(tcpAddr_);
+ }
+
/* This methods belong to the MBean interface */
public String getToken(EndPoint ep)
@@ -680,7 +576,7 @@
public String getToken()
{
- return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
+ return getLocalToken().toString();
}
public Set<String> getLiveNodes()
@@ -1116,41 +1012,4 @@
tokens.add(range.right().toString());
return tokens;
}
-
- class BootstrapTokenCallback implements IAsyncCallback
- {
- private volatile Token<?> token;
- private final Condition condition = new SimpleCondition();
-
- public Token<?> getToken()
- {
- try
- {
- condition.await();
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- return token;
- }
-
- public void response(Message msg)
- {
- try
- {
- token = partitioner_.getTokenFactory().fromString(new String(msg.getMessageBody(), "UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- throw new AssertionError();
- }
- condition.signalAll();
- }
-
- public void attachContext(Object o)
- {
- throw new UnsupportedOperationException();
- }
- }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Fri Oct 9 17:07:14 2009
@@ -39,45 +39,7 @@
public final class StreamManager
{
private static Logger logger_ = Logger.getLogger( StreamManager.class );
-
- public static class BootstrapTerminateVerbHandler implements IVerbHandler
- {
- private static Logger logger_ = Logger.getLogger( BootstrapTerminateVerbHandler.class );
-
- public void doVerb(Message message)
- {
- byte[] body = message.getMessageBody();
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
-
- try
- {
- StreamContextManager.StreamStatusMessage streamStatusMessage = StreamContextManager.StreamStatusMessage.serializer().deserialize(bufIn);
- StreamContextManager.StreamStatus streamStatus = streamStatusMessage.getStreamStatus();
-
- switch( streamStatus.getAction() )
- {
- case DELETE:
- StreamManager.instance(message.getFrom()).finish(streamStatus.getFile());
- break;
-
- case STREAM:
- if (logger_.isDebugEnabled())
- logger_.debug("Need to re-stream file " + streamStatus.getFile());
- StreamManager.instance(message.getFrom()).repeat();
- break;
-
- default:
- break;
- }
- }
- catch ( IOException ex )
- {
- logger_.info(LogUtil.throwableToString(ex));
- }
- }
- }
-
+
private static Map<EndPoint, StreamManager> streamManagers_ = new HashMap<EndPoint, StreamManager>();
public static StreamManager instance(EndPoint to)
@@ -111,7 +73,7 @@
}
}
- void start()
+ public void start()
{
if ( filesToStream_.size() > 0 )
{
@@ -122,13 +84,13 @@
}
}
- void repeat()
+ public void repeat()
{
if ( filesToStream_.size() > 0 )
start();
}
- void finish(String file) throws IOException
+ public void finish(String file) throws IOException
{
File f = new File(file);
if (logger_.isDebugEnabled())
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=823616&r1=823615&r2=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Fri Oct 9 17:07:14 2009
@@ -31,6 +31,10 @@
import static junit.framework.Assert.assertEquals;
import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
import org.apache.cassandra.db.filter.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
@@ -95,4 +99,37 @@
TableTest.reTest(store, r);
}
+
+ /**
+ * Writes out a bunch of keys into an SSTable, then runs anticompaction on a range.
+ * Checks to see if anticompaction returns true.
+ */
+ private void testAntiCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
+
+ for (int j = 0; j < insertsPerTable; j++)
+ {
+ String key = String.valueOf(j);
+ RowMutation rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
+ rm.apply();
+ }
+
+ store.forceBlockingFlush();
+ List<Range> ranges = new ArrayList<Range>();
+ IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
+ Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
+ ranges.add(r);
+
+ List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1", 9150));
+ assert fileList.size() >= 1;
+ }
+
+ @Test
+ public void testAntiCompaction1() throws IOException, ExecutionException, InterruptedException
+ {
+ testAntiCompaction("Standard1", 100);
+ }
}
Copied: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (from r823562, incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?p2=incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java&p1=incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java&r1=823562&r2=823616&rev=823616&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Fri Oct 9 17:07:14 2009
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.cassandra.db;
+package org.apache.cassandra.dht;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.*;
@@ -38,39 +38,6 @@
public class BootstrapTest
{
- /**
- * Writes out a bunch of keys into an SSTable, then runs anticompaction on a range.
- * Checks to see if anticompaction returns true.
- */
- private void testAntiCompaction(String columnFamilyName, int insertsPerTable) throws IOException, ExecutionException, InterruptedException
- {
- Table table = Table.open("Keyspace1");
- ColumnFamilyStore store = table.getColumnFamilyStore(columnFamilyName);
-
- for (int j = 0; j < insertsPerTable; j++)
- {
- String key = String.valueOf(j);
- RowMutation rm = new RowMutation("Keyspace1", key);
- rm.add(new QueryPath(columnFamilyName, null, "0".getBytes()), new byte[0], j);
- rm.apply();
- }
-
- store.forceBlockingFlush();
- List<Range> ranges = new ArrayList<Range>();
- IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
- Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
- ranges.add(r);
-
- List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1", 9150));
- assert fileList.size() >= 1;
- }
-
- @Test
- public void testAntiCompaction1() throws IOException, ExecutionException, InterruptedException
- {
- testAntiCompaction("Standard1", 100);
- }
-
@Test
public void testGetNewNames() throws IOException
{
@@ -78,7 +45,7 @@
streamContexts[0] = new StreamContextManager.StreamContext("/foo/Standard1-500-Data.db", 100, "Keyspace1");
streamContexts[1] = new StreamContextManager.StreamContext("/foo/Standard1-500-Index.db", 100, "Keyspace1");
streamContexts[2] = new StreamContextManager.StreamContext("/foo/Standard1-500-Filter.db", 100, "Keyspace1");
- Table.BootStrapInitiateVerbHandler bivh = new Table.BootStrapInitiateVerbHandler();
+ BootStrapper.BootStrapInitiateVerbHandler bivh = new BootStrapper.BootStrapInitiateVerbHandler();
Map<String, String> fileNames = bivh.getNewNames(streamContexts);
String result = fileNames.get("Keyspace1-Standard1-500");
assertEquals(true, result.contains("Standard1"));
@@ -89,6 +56,4 @@
assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[1])).getName().matches("Standard1-tmp-\\d+-Index.db"));
assertTrue( new File(bivh.getNewFileNameFromOldContextAndNames(fileNames, streamContexts[2])).getName().matches("Standard1-tmp-\\d+-Filter.db"));
}
-
-
}