You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/13 16:57:31 UTC
svn commit: r835890 - in /incubator/cassandra/trunk: bin/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/
src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/net/ src/java/org/apac...
Author: jbellis
Date: Fri Nov 13 15:57:31 2009
New Revision: 835890
URL: http://svn.apache.org/viewvc?rev=835890&view=rev
Log:
add Move command
patch by jbellis; reviewed by Jaakko Laine for CASSANDRA-541
Removed:
incubator/cassandra/trunk/bin/tokenupdater
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Fri Nov 13 15:57:31 2009
@@ -127,7 +127,7 @@
if (initialToken == null)
token = p.getRandomToken();
else
- token = p.getToken(initialToken);
+ token = p.getTokenFactory().fromString(initialToken);
logger.info("Saved Token not found. Using " + token);
// seconds-since-epoch isn't a foolproof new generation
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/IPartitioner.java Fri Nov 13 15:57:31 2009
@@ -72,6 +72,8 @@
/**
* @return a Token that can be used to route a given key
+ * (This is NOT a method to create a Token from its string representation;
+ * for that, use TokenFactory.fromString.)
*/
public T getToken(String key);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Fri Nov 13 15:57:31 2009
@@ -50,6 +50,7 @@
{
private static final Logger logger = Logger.getLogger(SSTable.class);
+ public static final int FILES_ON_DISK = 3; // data, index, and bloom filter
protected String path;
protected IPartitioner partitioner;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Fri Nov 13 15:57:31 2009
@@ -72,7 +72,7 @@
if (sstables.isEmpty())
return;
- StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[sstables.size()];
+ StreamContextManager.StreamContext[] streamContexts = new StreamContextManager.StreamContext[SSTable.FILES_ON_DISK * sstables.size()];
int i = 0;
for (SSTableReader sstable : sstables)
{
@@ -130,7 +130,6 @@
StreamContextManager.StreamStatus streamStatus = new StreamContextManager.StreamStatus(streamContext.getTargetFile(), streamContext.getExpectedBytes() );
String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
- //String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
if (logger.isDebugEnabled())
logger.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
streamContext.setTargetFile(file);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Fri Nov 13 15:57:31 2009
@@ -88,6 +88,7 @@
lock.writeLock().lock();
try
{
+ tokenToEndPointMap.inverse().remove(endpoint);
if (!endpoint.equals(tokenToEndPointMap.put(token, endpoint)))
{
sortedTokens = sortTokens();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/TcpConnection.java Fri Nov 13 15:57:31 2009
@@ -71,7 +71,6 @@
private TcpConnection(InetAddress from, InetAddress to, TcpConnectionManager pool, boolean streaming) throws IOException
{
- logger_.debug("creating connection from " + from + " to " + to);
socketChannel_ = SocketChannel.open();
socketChannel_.socket().bind(new InetSocketAddress(from, 0));
socketChannel_.configureBlocking(false);
@@ -291,18 +290,20 @@
void closeSocket()
{
- logger_.warn("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
+ if (pendingWrites_.size() > 0)
+ logger_.error("Closing down connection " + socketChannel_ + " with " + pendingWrites_.size() + " writes remaining.");
cancel(key_);
pendingWrites_.clear();
}
void errorClose()
{
- logger_.warn("Closing down connection " + socketChannel_);
+ logger_.info("Closing errored connection " + socketChannel_);
pendingWrites_.clear();
cancel(key_);
pendingWrites_.clear();
- pool_.destroy(this);
+ if (pool_ != null)
+ pool_.destroy(this);
}
private void cancel(SelectionKey key)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Fri Nov 13 15:57:31 2009
@@ -261,7 +261,7 @@
{
List<StreamContext> context = ctxBag_.get(key);
if ( context == null )
- throw new IllegalStateException("Streaming context has not been set.");
+ throw new IllegalStateException("Streaming context has not been set for " + key);
StreamContext streamContext = context.remove(0);
if ( context.isEmpty() )
ctxBag_.remove(key);
@@ -272,7 +272,7 @@
{
List<StreamStatus> status = streamStatusBag_.get(key);
if ( status == null )
- throw new IllegalStateException("Streaming status has not been set.");
+ throw new IllegalStateException("Streaming status has not been set for " + key);
StreamStatus streamStatus = status.remove(0);
if ( status.isEmpty() )
streamStatusBag_.remove(key);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Nov 13 15:57:31 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.service;
import java.io.IOException;
+import java.io.IOError;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.util.*;
@@ -71,7 +72,6 @@
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
- public final static String tokenVerbHandler_ = "TOKEN-VERB-HANDLER";
public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
public final static String readRepairVerbHandler_ = "READ-REPAIR-VERB-HANDLER";
public final static String readVerbHandler_ = "ROW-READ-VERB-HANDLER";
@@ -165,7 +165,7 @@
{
bootstrapSet.remove(s);
if (logger_.isDebugEnabled())
- logger_.debug("Removed " + s + " as a bootstrap source");
+ logger_.debug("Removed " + s + " as a bootstrap source; remaining is [" + StringUtils.join(bootstrapSet, ", ") + "]");
if (bootstrapSet.isEmpty())
{
@@ -222,7 +222,6 @@
endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
/* register the verb handlers */
- MessagingService.instance().registerVerbHandlers(tokenVerbHandler_, new TokenUpdateVerbHandler());
MessagingService.instance().registerVerbHandlers(binaryVerbHandler_, new BinaryVerbHandler());
MessagingService.instance().registerVerbHandlers(mutationVerbHandler_, new RowMutationVerbHandler());
MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
@@ -258,8 +257,6 @@
public void start() throws IOException
{
storageMetadata_ = SystemTable.initMetadata();
- isBootstrapMode = DatabaseDescriptor.isAutoBootstrap()
- && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped());
/* Listen for application messages */
MessagingService.instance().listen(FBUtilities.getLocalAddress());
@@ -277,15 +274,14 @@
Gossiper.instance().register(this);
Gossiper.instance().start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
- if (isBootstrapMode)
+ if (DatabaseDescriptor.isAutoBootstrap()
+ && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped()))
{
logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
StorageLoadBalancer.instance().waitForLoadInfo();
logger_.info("... got load info");
Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance().getLoadInfo());
- SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
- Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new ApplicationState(partitioner_.getTokenFactory().toString(getLocalToken())));
- new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), getLocalToken(), tokenMetadata_).startBootstrap(); // handles token update
+ startBootstrap(token);
// don't finish startup (enabling thrift) until after bootstrap is done
while (isBootstrapMode)
{
@@ -303,13 +299,21 @@
{
SystemTable.setBootstrapped(true);
Token token = storageMetadata_.getToken();
- setToken(token);
+ tokenMetadata_.update(token, FBUtilities.getLocalAddress());
Gossiper.instance().addApplicationState(StorageService.STATE_NORMAL, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
}
assert tokenMetadata_.sortedTokens().size() > 0;
}
+ private void startBootstrap(Token token) throws IOException
+ {
+ isBootstrapMode = true;
+ SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
+ Gossiper.instance().addApplicationState(StorageService.STATE_BOOTSTRAPPING, new ApplicationState(partitioner_.getTokenFactory().toString(token)));
+ new BootStrapper(replicationStrategy_, FBUtilities.getLocalAddress(), token, tokenMetadata_).startBootstrap(); // handles token update
+ }
+
public boolean isBootstrapMode()
{
return isBootstrapMode;
@@ -935,7 +939,7 @@
return tokens;
}
- public void decommission()
+ public void decommission() throws InterruptedException
{
if (!tokenMetadata_.isMember(FBUtilities.getLocalAddress()))
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
@@ -947,15 +951,22 @@
logger_.info("DECOMMISSIONING");
Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
logger_.info("decommission sleeping " + Streaming.RING_DELAY);
- try
- {
- Thread.sleep(Streaming.RING_DELAY);
- }
- catch (InterruptedException e)
+ Thread.sleep(Streaming.RING_DELAY);
+
+ Runnable finishLeaving = new Runnable()
{
- throw new AssertionError(e);
- }
+ public void run()
+ {
+ Gossiper.instance().stop();
+ logger_.info("DECOMMISSION FINISHED.");
+ // let op be responsible for killing the process
+ }
+ };
+ unbootstrap(finishLeaving);
+ }
+ private void unbootstrap(final Runnable onFinish)
+ {
Multimap<Range, InetAddress> rangesMM = getChangedRangesForLeaving(FBUtilities.getLocalAddress());
if (logger_.isDebugEnabled())
logger_.debug("Ranges needing transfer are [" + StringUtils.join(rangesMM.keySet(), ",") + "]");
@@ -970,7 +981,25 @@
{
pending.remove(entry);
if (pending.isEmpty())
- finishLeaving();
+ {
+ SystemTable.setBootstrapped(false);
+ tokenMetadata_.removeEndpoint(FBUtilities.getLocalAddress());
+ replicationStrategy_.removeObsoletePendingRanges();
+
+ if (logger_.isDebugEnabled())
+ logger_.debug("");
+ Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
+ try
+ {
+ Thread.sleep(2 * Gossiper.intervalInMillis_);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
+ onFinish.run();
+ }
}
};
StageManager.getStage(streamStage_).execute(new Runnable()
@@ -984,6 +1013,35 @@
}
}
+ public void move(String newToken) throws InterruptedException
+ {
+ if (tokenMetadata_.getPendingRanges(FBUtilities.getLocalAddress()).size() > 0)
+ throw new UnsupportedOperationException("data is currently moving to this node; unable to leave the ring");
+
+ final Token token = partitioner_.getTokenFactory().fromString(newToken); // make sure it's valid
+ logger_.info("moving to " + token);
+ Gossiper.instance().addApplicationState(STATE_LEAVING, new ApplicationState(getLocalToken().toString()));
+ logger_.info("move sleeping " + Streaming.RING_DELAY);
+ Thread.sleep(Streaming.RING_DELAY);
+
+ Runnable finishMoving = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ logger_.info("re-bootstrapping to new token " + token);
+ startBootstrap(token);
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
+ }
+ };
+ unbootstrap(finishMoving);
+ }
+
public <T> QuorumResponseHandler<T> getResponseHandler(IResponseResolver<T> responseResolver, int blockFor, int consistency_level)
{
return replicationStrategy_.getResponseHandler(responseResolver, blockFor, consistency_level);
@@ -994,20 +1052,4 @@
return replicationStrategy_;
}
- public void finishLeaving()
- {
- SystemTable.setBootstrapped(false);
- Gossiper.instance().addApplicationState(STATE_LEFT, new ApplicationState(getLocalToken().toString()));
- try
- {
- Thread.sleep(2 * Gossiper.intervalInMillis_);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- Gossiper.instance().stop();
- logger_.info("DECOMMISSION FINISHED.");
- // let op be responsible for killing the process
- }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Fri Nov 13 15:57:31 2009
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.FutureTask;
import org.apache.cassandra.dht.Range;
import java.net.InetAddress;
@@ -116,7 +117,13 @@
/**
* transfer this node's data to other machines and remove it from service.
*/
- public void decommission();
+ public void decommission() throws InterruptedException;
+
+ /**
+ * @param newToken token to move this node to.
+ * This node will unload its data onto its neighbors, and bootstrap to the new token.
+ */
+ public void move(String newToken) throws InterruptedException;
/** set the logging level at runtime */
public void setLog4jLevel(String classQualifier, String level);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=835890&r1=835889&r2=835890&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Fri Nov 13 15:57:31 2009
@@ -380,11 +380,16 @@
ssProxy.clearSnapshot();
}
- public void decommission()
+ public void decommission() throws InterruptedException
{
ssProxy.decommission();
}
+ public void move(String newToken) throws InterruptedException
+ {
+ ssProxy.move(newToken);
+ }
+
/**
* Print out the size of the queues in the thread pools
*
@@ -480,7 +485,7 @@
HelpFormatter hf = new HelpFormatter();
String header = String.format(
"%nAvailable commands: ring, info, cleanup, compact, cfstats, snapshot [name], clearsnapshot, " +
- "tpstats, flush, decommission, " +
+ "tpstats, flush, decommission, move, " +
" getcompactionthreshold, setcompactionthreshold [minthreshold] ([maxthreshold])");
String usage = String.format("java %s -host <arg> <command>%n", NodeProbe.class.getName());
hf.printHelp(usage, "", options, header);
@@ -489,7 +494,7 @@
/**
* @param args
*/
- public static void main(String[] args) throws IOException
+ public static void main(String[] args) throws IOException, InterruptedException
{
NodeProbe probe = null;
try
@@ -543,6 +548,14 @@
{
probe.decommission();
}
+ else if (cmdName.equals("move"))
+ {
+ if (arguments.length <= 1)
+ {
+ System.err.println("missing token argument");
+ }
+ probe.move(arguments[1]);
+ }
else if (cmdName.equals("snapshot"))
{
String snapshotName = "";