You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/27 15:40:07 UTC
svn commit: r830211 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
dht/BootStrapper.java locator/TokenMetadata.java service/StorageService.java
Author: jbellis
Date: Tue Oct 27 14:40:06 2009
New Revision: 830211
URL: http://svn.apache.org/viewvc?rev=830211&view=rev
Log:
rename away underscores
patch by jbellis; reviewed by goffinet for CASSANDRA-483
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=830211&r1=830210&r2=830211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Oct 27 14:40:06 2009
@@ -19,7 +19,6 @@
package org.apache.cassandra.dht;
import java.util.ArrayList;
- import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,7 +26,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
- import java.util.concurrent.ExecutorService;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.io.File;
@@ -51,7 +49,6 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.ApplicationState;
- import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
@@ -72,40 +69,40 @@
{
public static final long INITIAL_DELAY = 30 * 1000; //ms
- static final Logger logger_ = Logger.getLogger(BootStrapper.class);
+ static final Logger logger = Logger.getLogger(BootStrapper.class);
/* endpoints that need to be bootstrapped */
- protected final List<InetAddress> targets_;
+ protected final List<InetAddress> targets;
/* tokens of the nodes being bootstrapped. */
- protected final Token[] tokens_;
- protected final TokenMetadata tokenMetadata_;
+ protected final Token[] tokens;
+ protected final TokenMetadata tokenMetadata;
public BootStrapper(List<InetAddress> targets, Token... token)
{
- targets_ = targets;
- tokens_ = token;
- tokenMetadata_ = StorageService.instance().getTokenMetadata();
+ this.targets = targets;
+ tokens = token;
+ tokenMetadata = StorageService.instance().getTokenMetadata();
}
Map<Range, List<BootstrapSourceTarget>> getRangesWithSourceTarget()
{
/* copy the token to endpoint map */
- Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata.cloneTokenEndPointMap();
/* remove the tokens associated with the endpoints being bootstrapped */
- for (Token token : tokens_)
+ for (Token token : tokens)
{
tokenToEndPointMap.remove(token);
}
Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
- if (logger_.isDebugEnabled())
- logger_.debug("Total number of old ranges " + oldRanges.length);
+ if (logger.isDebugEnabled())
+ logger.debug("Total number of old ranges " + oldRanges.length);
/*
* Find the ranges that are split. Maintain a mapping between
* the range being split and the list of subranges.
*/
- Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens_);
+ Map<Range, List<Range>> splitRanges = LeaveJoinProtocolHelper.getRangeSplitRangeMapping(oldRanges, tokens);
/* Calculate the list of nodes that handle the old ranges */
Map<Range, List<InetAddress>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges, tokenToEndPointMap);
/* Mapping of split ranges to the list of endpoints responsible for the range */
@@ -134,11 +131,11 @@
}
/* Add the new token and re-calculate the range assignments */
- Collections.addAll( oldTokens, tokens_ );
+ Collections.addAll( oldTokens, tokens);
Range[] newRanges = StorageService.instance().getAllRanges(oldTokens);
- if (logger_.isDebugEnabled())
- logger_.debug("Total number of new ranges " + newRanges.length);
+ if (logger.isDebugEnabled())
+ logger.debug("Total number of new ranges " + newRanges.length);
/* Calculate the list of nodes that handle the new ranges */
Map<Range, List<InetAddress>> newRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(newRanges);
/* Calculate ranges that need to be sent and from whom to where */
@@ -156,7 +153,7 @@
public void startBootstrap() throws IOException
{
- logger_.info("Starting in bootstrap mode (first, sleeping to get load information)");
+ logger.info("Starting in bootstrap mode (first, sleeping to get load information)");
StorageService ss = StorageService.instance();
StorageLoadBalancer slb = StorageLoadBalancer.instance();
@@ -184,7 +181,7 @@
if (!maxEndpoint.equals(FBUtilities.getLocalAddress()))
{
Token<?> t = getBootstrapTokenFrom(maxEndpoint);
- logger_.info("Setting token to " + t + " to assume load from " + maxEndpoint);
+ logger.info("Setting token to " + t + " to assume load from " + maxEndpoint);
ss.updateToken(t);
}
}
@@ -194,14 +191,14 @@
public void run()
{
// Mark as not bootstrapping to calculate ranges correctly
- for (int i=0; i< targets_.size(); i++)
+ for (int i=0; i< targets.size(); i++)
{
- tokenMetadata_.update(tokens_[i], targets_.get(i), false);
+ tokenMetadata.update(tokens[i], targets.get(i), false);
}
Map<Range, List<BootstrapSourceTarget>> rangesWithSourceTarget = getRangesWithSourceTarget();
- if (logger_.isDebugEnabled())
- logger_.debug("Beginning bootstrap process for [" + StringUtils.join(targets_, ", ") + "] ...");
+ if (logger.isDebugEnabled())
+ logger.debug("Beginning bootstrap process for [" + StringUtils.join(targets, ", ") + "] ...");
/* Send messages to respective folks to stream data over to the new nodes being bootstrapped */
try
{
@@ -240,8 +237,8 @@
{
public void doVerb(Message message)
{
- if (logger_.isDebugEnabled())
- logger_.debug("Received a bootstrap initiate done message ...");
+ if (logger.isDebugEnabled())
+ logger.debug("Received a bootstrap initiate done message ...");
/* Let the Stream Manager do his thing. */
StreamManager.instance(message.getFrom()).start();
}
@@ -311,22 +308,22 @@
String file = getNewFileNameFromOldContextAndNames(fileNames, streamContext);
//String file = DatabaseDescriptor.getDataFileLocationForTable(streamContext.getTable()) + File.separator + newFileName + "-Data.db";
- if (logger_.isDebugEnabled())
- logger_.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
+ if (logger.isDebugEnabled())
+ logger.debug("Received Data from : " + message.getFrom() + " " + streamContext.getTargetFile() + " " + file);
streamContext.setTargetFile(file);
addStreamContext(message.getFrom(), streamContext, streamStatus);
}
StreamContextManager.registerStreamCompletionHandler(message.getFrom(), new BootstrapCompletionHandler());
/* Send a bootstrap initiation done message to execute on default stage. */
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap initiate done message ...");
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a bootstrap initiate done message ...");
Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapInitiateDoneVerbHandler_, new byte[0] );
MessagingService.instance().sendOneWay(doneMessage, message.getFrom());
}
catch ( IOException ex )
{
- logger_.info(LogUtil.throwableToString(ex));
+ logger.info(LogUtil.throwableToString(ex));
}
}
@@ -371,8 +368,8 @@
Table table = Table.open( tableName );
ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
- if (logger_.isDebugEnabled())
- logger_.debug("Generating file name for " + distinctEntry + " ...");
+ if (logger.isDebugEnabled())
+ logger.debug("Generating file name for " + distinctEntry + " ...");
fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
}
@@ -381,8 +378,8 @@
private void addStreamContext(InetAddress host, StreamContextManager.StreamContext streamContext, StreamContextManager.StreamStatus streamStatus)
{
- if (logger_.isDebugEnabled())
- logger_.debug("Adding stream context " + streamContext + " for " + host + " ...");
+ if (logger.isDebugEnabled())
+ logger.debug("Adding stream context " + streamContext + " for " + host + " ...");
StreamContextManager.addStreamContext(host, streamContext, streamStatus);
}
}
@@ -413,16 +410,16 @@
//TODO add a sanity check that this sstable has all its parts and is ok
Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
- logger_.info("Bootstrap added " + sstable.getFilename());
+ logger.info("Bootstrap added " + sstable.getFilename());
}
catch (IOException e)
{
- logger_.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
+ logger.error("Not able to bootstrap with file " + streamContext.getTargetFile(), e);
}
}
- if (logger_.isDebugEnabled())
- logger_.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + host);
+ if (logger.isDebugEnabled())
+ logger.debug("Sending a bootstrap terminate message with " + streamStatus + " to " + host);
/* Send a StreamStatusMessage object which may require the source node to re-stream certain files. */
StreamContextManager.StreamStatusMessage streamStatusMessage = new StreamContextManager.StreamStatusMessage(streamStatus);
Message message = StreamContextManager.StreamStatusMessage.makeStreamStatusMessage(streamStatusMessage);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=830211&r1=830210&r2=830211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Tue Oct 27 14:40:06 2009
@@ -31,26 +31,26 @@
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
- private Map<Token, InetAddress> tokenToEndPointMap_;
+ private Map<Token, InetAddress> tokenToEndPointMap;
/* Maintains a reverse index of endpoint to token in the cluster. */
- private Map<InetAddress, Token> endPointToTokenMap_;
+ private Map<InetAddress, Token> endPointToTokenMap;
/* Bootstrapping nodes and their tokens */
private Map<Token, InetAddress> bootstrapNodes;
/* Use this lock for manipulating the token map */
- private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
public TokenMetadata()
{
- tokenToEndPointMap_ = new HashMap<Token, InetAddress>();
- endPointToTokenMap_ = new HashMap<InetAddress, Token>();
+ tokenToEndPointMap = new HashMap<Token, InetAddress>();
+ endPointToTokenMap = new HashMap<InetAddress, Token>();
this.bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, InetAddress>());
}
public TokenMetadata(Map<Token, InetAddress> tokenToEndPointMap, Map<InetAddress, Token> endPointToTokenMap, Map<Token, InetAddress> bootstrapNodes)
{
- tokenToEndPointMap_ = tokenToEndPointMap;
- endPointToTokenMap_ = endPointToTokenMap;
+ this.tokenToEndPointMap = tokenToEndPointMap;
+ this.endPointToTokenMap = endPointToTokenMap;
this.bootstrapNodes = bootstrapNodes;
}
@@ -68,7 +68,7 @@
*/
public void update(Token token, InetAddress endpoint, boolean bootstrapState)
{
- lock_.writeLock().lock();
+ lock.writeLock().lock();
try
{
if (bootstrapState)
@@ -79,16 +79,16 @@
else
{
bootstrapNodes.remove(token); // If this happened to be there
- Token oldToken = endPointToTokenMap_.get(endpoint);
+ Token oldToken = endPointToTokenMap.get(endpoint);
if ( oldToken != null )
- tokenToEndPointMap_.remove(oldToken);
- tokenToEndPointMap_.put(token, endpoint);
- endPointToTokenMap_.put(endpoint, token);
+ tokenToEndPointMap.remove(oldToken);
+ tokenToEndPointMap.put(token, endpoint);
+ endPointToTokenMap.put(endpoint, token);
}
}
finally
{
- lock_.writeLock().unlock();
+ lock.writeLock().unlock();
}
}
@@ -98,77 +98,77 @@
*/
public void remove(InetAddress endpoint)
{
- lock_.writeLock().lock();
+ lock.writeLock().lock();
try
{
- Token oldToken = endPointToTokenMap_.get(endpoint);
+ Token oldToken = endPointToTokenMap.get(endpoint);
if ( oldToken != null )
- tokenToEndPointMap_.remove(oldToken);
- endPointToTokenMap_.remove(endpoint);
+ tokenToEndPointMap.remove(oldToken);
+ endPointToTokenMap.remove(endpoint);
}
finally
{
- lock_.writeLock().unlock();
+ lock.writeLock().unlock();
}
}
public Token getToken(InetAddress endpoint)
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- return endPointToTokenMap_.get(endpoint);
+ return endPointToTokenMap.get(endpoint);
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
public boolean isKnownEndPoint(InetAddress ep)
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- return endPointToTokenMap_.containsKey(ep);
+ return endPointToTokenMap.containsKey(ep);
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
public InetAddress getFirstEndpoint()
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndPointMap_.keySet());
+ ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
if (tokens.isEmpty())
return null;
Collections.sort(tokens);
- return tokenToEndPointMap_.get(tokens.get(0));
+ return tokenToEndPointMap.get(tokens.get(0));
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
public InetAddress getNextEndpoint(InetAddress endPoint) throws UnavailableException
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndPointMap_.keySet());
+ ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
if (tokens.isEmpty())
return null;
Collections.sort(tokens);
- int i = tokens.indexOf(endPointToTokenMap_.get(endPoint)); // TODO binary search
+ int i = tokens.indexOf(endPointToTokenMap.get(endPoint)); // TODO binary search
int j = 1;
InetAddress ep;
- while (!FailureDetector.instance().isAlive((ep = tokenToEndPointMap_.get(tokens.get((i + j) % tokens.size())))))
+ while (!FailureDetector.instance().isAlive((ep = tokenToEndPointMap.get(tokens.get((i + j) % tokens.size())))))
{
if (++j > DatabaseDescriptor.getReplicationFactor())
{
@@ -179,20 +179,20 @@
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
public Map<Token, InetAddress> cloneBootstrapNodes()
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
return new HashMap<Token, InetAddress>( bootstrapNodes );
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
@@ -202,14 +202,14 @@
*/
public Map<Token, InetAddress> cloneTokenEndPointMap()
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- return new HashMap<Token, InetAddress>( tokenToEndPointMap_ );
+ return new HashMap<Token, InetAddress>(tokenToEndPointMap);
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
@@ -218,27 +218,27 @@
*/
public Map<InetAddress, Token> cloneEndPointTokenMap()
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- return new HashMap<InetAddress, Token>( endPointToTokenMap_ );
+ return new HashMap<InetAddress, Token>(endPointToTokenMap);
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
public String toString()
{
StringBuilder sb = new StringBuilder();
- Set<InetAddress> eps = endPointToTokenMap_.keySet();
+ Set<InetAddress> eps = endPointToTokenMap.keySet();
for ( InetAddress ep : eps )
{
sb.append(ep);
sb.append(":");
- sb.append(endPointToTokenMap_.get(ep));
+ sb.append(endPointToTokenMap.get(ep));
sb.append(System.getProperty("line.separator"));
}
@@ -247,14 +247,14 @@
public InetAddress getEndPoint(Token token)
{
- lock_.readLock().lock();
+ lock.readLock().lock();
try
{
- return tokenToEndPointMap_.get(token);
+ return tokenToEndPointMap.get(token);
}
finally
{
- lock_.readLock().unlock();
+ lock.readLock().unlock();
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=830211&r1=830210&r2=830211&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Oct 27 14:40:06 2009
@@ -51,7 +51,7 @@
public final class StorageService implements IEndPointStateChangeSubscriber, StorageServiceMBean
{
private static Logger logger_ = Logger.getLogger(StorageService.class);
- private final static String nodeId_ = "NODE-IDENTIFIER";
+ private final static String NODE_ID = "NODE-IDENTIFIER";
public final static String BOOTSTRAP_MODE = "BOOTSTRAP-MODE";
/* All stage identifiers */
@@ -268,7 +268,7 @@
// note that before we do this we've (a) finalized what the token is actually going to be, and
// (b) added a bootstrap state (done by startBootstrap)
ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(storageMetadata_.getToken()));
- Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
+ Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
}
public boolean isBootstrapMode()
@@ -372,7 +372,7 @@
public void onChange(InetAddress endpoint, EndPointState epState)
{
/* node identifier for this endpoint on the identifier space */
- ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
+ ApplicationState nodeIdState = epState.getApplicationState(StorageService.NODE_ID);
/* Check if this has a bootstrapping state message */
boolean bootstrapState = epState.getApplicationState(StorageService.BOOTSTRAP_MODE) != null;
if (bootstrapState)
@@ -476,7 +476,7 @@
tokenMetadata_.update(token, FBUtilities.getLocalAddress());
/* Gossip this new token for the local storage instance */
ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token));
- Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
+ Gossiper.instance().addApplicationState(StorageService.NODE_ID, state);
}
/*