You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/09 19:07:24 UTC
svn commit: r823617 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
dht/BootStrapper.java gms/Gossiper.java
locator/AbstractReplicationStrategy.java service/StorageLoadBalancer.java
service/StorageService.java
Author: jbellis
Date: Fri Oct 9 17:07:24 2009
New Revision: 823617
URL: http://svn.apache.org/viewvc?rev=823617&view=rev
Log:
fix regressions
patch by jbellis reviewed by Eric Evans for CASSANDRA-477
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Fri Oct 9 17:07:24 2009
@@ -37,6 +37,7 @@
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.net.*;
import org.apache.cassandra.net.io.StreamContextManager;
import org.apache.cassandra.net.io.IStreamComplete;
@@ -208,6 +209,7 @@
if (!maxEndpoint.equals(StorageService.getLocalStorageEndPoint()))
{
+ StorageService.instance().retrofitPorts(Arrays.asList(maxEndpoint));
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint.getHost());
ss.updateToken(t);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Oct 9 17:07:24 2009
@@ -95,7 +95,7 @@
final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
/* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
- final static int intervalInMillis_ = 1000;
+ public final static int intervalInMillis_ = 1000;
private static Logger logger_ = Logger.getLogger(Gossiper.class);
static Gossiper gossiper_;
@@ -522,6 +522,8 @@
{
reqdEndPointState = new EndPointState(epState.getHeartBeatState());
}
+ if (logger_.isTraceEnabled())
+ logger_.trace("Adding state " + key + ": " + appState.getState());
reqdEndPointState.addApplicationState(key, appState);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Fri Oct 9 17:07:24 2009
@@ -73,7 +73,7 @@
* This method changes the ports of the endpoints from
* the control port to the storage ports.
*/
- protected void retrofitPorts(List<EndPoint> eps)
+ public void retrofitPorts(List<EndPoint> eps)
{
for ( EndPoint ep : eps )
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Fri Oct 9 17:07:24 2009
@@ -351,8 +351,9 @@
public void startBroadcasting()
{
- /* starts a load timer thread */
- loadTimer_.schedule(new LoadDisseminator(), BROADCAST_INTERVAL, BROADCAST_INTERVAL);
+ // send the first broadcast "right away" (i.e., in 2 gossip heartbeats, when we should have someone to talk to);
+ // after that send every BROADCAST_INTERVAL.
+ loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL);
}
/** wait for node information to be available. if the rest of the cluster just came up,
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=823617&r1=823616&r2=823617&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Oct 9 17:07:24 2009
@@ -226,12 +226,12 @@
MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
// see BootStrapper for a summary of how the bootstrap verbs interact
+ MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
MessagingService.instance().registerVerbHandlers(bootstrapMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
MessagingService.instance().registerVerbHandlers(bootStrapInitiateVerbHandler_, new BootStrapper.BootStrapInitiateVerbHandler());
MessagingService.instance().registerVerbHandlers(bootStrapInitiateDoneVerbHandler_, new BootStrapper.BootstrapInitiateDoneVerbHandler());
MessagingService.instance().registerVerbHandlers(bootStrapTerminateVerbHandler_, new BootStrapper.BootstrapTerminateVerbHandler());
- MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
-
+
StageManager.registerStage(StorageService.mutationStage_,
new MultiThreadedStage(StorageService.mutationStage_, DatabaseDescriptor.getConcurrentWriters()));
StageManager.registerStage(StorageService.readStage_,
@@ -266,15 +266,24 @@
StorageLoadBalancer.instance().startBroadcasting();
+ // have to start the gossip service before we can see any info on other nodes. this is necessary
+ // for bootstrap to get the load info it needs.
+ // (we won't be part of the storage ring though until we add a nodeId to our state, below.)
+ Gossiper.instance().register(this);
+ Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
+
if (isBootstrapMode)
{
- BootStrapper.startBootstrap();
+ BootStrapper.startBootstrap(); // handles token update
+ }
+ else
+ {
+ tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
}
- Gossiper.instance().register(this);
- Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
- /* Make sure this token gets gossiped around. */
- tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
+ // Gossip my token.
+ // note that before we do this we've (a) finalized what the token is actually going to be, and
+ // (b) added a bootstrap state (done by startBootstrap)
ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
}
@@ -935,6 +944,11 @@
return nodePicker_.getHintedStorageEndPoints(partitioner_.getToken(key));
}
+ public void retrofitPorts(List<EndPoint> eps)
+ {
+ nodePicker_.retrofitPorts(eps);
+ }
+
/**
* This function finds the most suitable endpoint given a key.
* It checks for locality and alive test.