You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:42:00 UTC
svn commit: r759028 [2/2] -
/incubator/cassandra/trunk/src/org/apache/cassandra/service/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 05:41:59 2009
@@ -18,32 +18,20 @@
package org.apache.cassandra.service;
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.analytics.AnalyticsContext;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.MultiThreadedStage;
@@ -68,22 +56,21 @@
import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.dht.BootstrapInitiateMessage;
import org.apache.cassandra.dht.BootstrapMetadataVerbHandler;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.locator.EndPointSnitch;
import org.apache.cassandra.locator.IEndPointSnitch;
import org.apache.cassandra.locator.IReplicaPlacementStrategy;
import org.apache.cassandra.locator.RackAwareStrategy;
import org.apache.cassandra.locator.RackUnawareStrategy;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -94,13 +81,22 @@
import org.apache.cassandra.tools.TokenUpdateVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.math.linear.RealMatrix;
+import org.apache.commons.math.linear.RealMatrixImpl;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.utils.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.WatcherEvent;
/*
* This abstraction contains the token/identifier of this node
@@ -137,10 +133,14 @@
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
+ public final static String jobConfigurationVerbHandler_ = "JOB-CONFIGURATION-VERB-HANDLER";
+ public final static String taskMetricVerbHandler_ = "TASK-METRIC-VERB-HANDLER";
+ public final static String mapAssignmentVerbHandler_ = "MAP-ASSIGNMENT-VERB-HANDLER";
+ public final static String reduceAssignmentVerbHandler_ = "REDUCE-ASSIGNMENT-VERB-HANDLER";
+ public final static String mapCompletionVerbHandler_ = "MAP-COMPLETION-VERB-HANDLER";
public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
- public static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
-
+
public static enum ConsistencyLevel
{
WEAK,
@@ -168,19 +168,20 @@
{
return "http://" + tcpAddr_.getHost() + ":" + DatabaseDescriptor.getHttpPort();
}
+
+ public static PartitionerType getPartitionerType()
+ {
+ return (DatabaseDescriptor.getHashingStrategy().equalsIgnoreCase(DatabaseDescriptor.ophf_)) ? PartitionerType.OPHF : PartitionerType.RANDOM;
+ }
/**
* This is a facade for the hashing
* function used by the system for
* partitioning.
*/
- public static Token token(String key)
+ public static BigInteger hash(String key)
{
- return partitioner_.getTokenForKey(key);
- }
-
- public static IPartitioner getPartitioner() {
- return partitioner_;
+ return partitioner_.hash(key);
}
public static enum BootstrapMode
@@ -248,6 +249,8 @@
*
*/
private IEndPointSnitch endPointSnitch_;
+ /* Uptime of this node - we use this to determine if a bootstrap can be performed by this node */
+ private long uptime_ = 0L;
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata_ = new TokenMetadata();
@@ -258,6 +261,11 @@
* for a clean exit.
*/
private Set<IComponentShutdown> components_ = new HashSet<IComponentShutdown>();
+ /*
+ * This boolean indicates if we are in loading state. If we are then we do not want any
+ * distributed algorithms w.r.t change in token state to kick in.
+ */
+ private boolean isLoadState_ = false;
/* Timer is used to disseminate load information */
private Timer loadTimer_ = new Timer(false);
@@ -304,6 +312,7 @@
public StorageService() throws Throwable
{
init();
+ uptime_ = System.currentTimeMillis();
storageLoadBalancer_ = new StorageLoadBalancer(this);
endPointSnitch_ = new EndPointSnitch();
@@ -318,6 +327,8 @@
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(HttpConnection.httpRequestVerbHandler_, new HttpRequestVerbHandler(this) );
+ MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenInfoVerbHandler_, new TokenInfoVerbHandler() );
+ MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.locationInfoVerbHandler_, new LocationInfoVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_, new DataFileVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
@@ -433,21 +444,23 @@
components_.add(component);
}
- static
+ private void initPartitioner()
{
- try
+ String hashingStrategy = DatabaseDescriptor.getHashingStrategy();
+ if ( hashingStrategy.equalsIgnoreCase(DatabaseDescriptor.ophf_) )
{
- Class cls = Class.forName(DatabaseDescriptor.getPartitionerClass());
- partitioner_ = (IPartitioner) cls.getConstructor().newInstance();
- }
- catch (Exception e)
+ partitioner_ = new OrderPreservingHashPartitioner();
+ }
+ else
{
- throw new RuntimeException(e);
+ partitioner_ = new RandomPartitioner();
}
}
public void start() throws Throwable
- {
+ {
+ /* Set up the partitioner */
+ initPartitioner();
/* Start the DB */
storageMetadata_ = DBManager.instance().start();
/* Set up TCP endpoint */
@@ -487,6 +500,23 @@
Gossiper.instance().addApplicationState(StorageService.nodeId_, new ApplicationState(storageMetadata_.getStorageId().toString()));
}
+ private void startMapReduceFramework()
+ {
+ // TODO: This is a null pointer exception if JobTrackerHost is not in
+ // the config file. Also, shouldn't this comparison be done by IP
+ // instead of host name? We could have a match but not a textual
+ // match (e.g. somehost.vip vs somehost.vip.domain.com)
+ if ( DatabaseDescriptor.getJobTrackerAddress().equals( StorageService.tcpAddr_.getHost() ) )
+ {
+// JobTracker.instance().start();
+// TaskTracker.instance().start();
+ }
+ else
+ {
+// TaskTracker.instance().start();
+ }
+ }
+
public void killMe() throws Throwable
{
isShutdown_.set(true);
@@ -539,7 +569,7 @@
}
/* TODO: remove later */
- public void updateTokenMetadata(Token token, EndPoint endpoint)
+ public void updateTokenMetadata(BigInteger token, EndPoint endpoint)
{
tokenMetadata_.update(token, endpoint);
}
@@ -582,11 +612,47 @@
consistencyManager_.submit(consistencySentinel);
}
+ /*
+ * This method displays all the ranges and the replicas
+ * that are responsible for the individual ranges. The
+ * format of this string is the following:
+ *
+ * R1 : A B C
+ * R2 : D E F
+ * R3 : G H I
+ */
+ public String showTheRing()
+ {
+ StringBuilder sb = new StringBuilder();
+ /* Get the token to endpoint map. */
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+ /* All the ranges for the tokens */
+ Range[] ranges = getAllRanges(tokens);
+ Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
+
+ Set<Range> rangeSet = oldRangeToEndPointMap.keySet();
+ for ( Range range : rangeSet )
+ {
+ sb.append(range);
+ sb.append(" : ");
+
+ List<EndPoint> replicas = oldRangeToEndPointMap.get(range);
+ for ( EndPoint replica : replicas )
+ {
+ sb.append(replica);
+ sb.append(" ");
+ }
+ sb.append(System.getProperty("line.separator"));
+ }
+ return sb.toString();
+ }
+
public Map<Range, List<EndPoint>> getRangeToEndPointMap()
{
/* Get the token to endpoint map. */
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Set<Token> tokens = tokenToEndPointMap.keySet();
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Set<BigInteger> tokens = tokenToEndPointMap.keySet();
/* All the ranges for the tokens */
Range[] ranges = getAllRanges(tokens);
Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
@@ -619,7 +685,7 @@
* @param tokenToEndPointMap mapping of token to endpoints.
* @return mapping of ranges to the replicas responsible for them.
*/
- public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
+ public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<BigInteger, EndPoint> tokenToEndPointMap)
{
logger_.debug("Constructing range to endpoint map ...");
Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
@@ -641,7 +707,7 @@
public Map<EndPoint, List<Range>> constructEndPointToRangesMap()
{
Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Collection<EndPoint> mbrs = tokenToEndPointMap.values();
for ( EndPoint mbr : mbrs )
{
@@ -649,6 +715,77 @@
}
return endPointToRangesMap;
}
+
+ /**
+ * Get the estimated disk space of the target endpoint in its
+ * primary range.
+ * @param target whose primary range we are interested in.
+ * @return disk space of the target in the primary range.
+ */
+ private double getDiskSpaceForPrimaryRange(EndPoint target)
+ {
+ double primaryDiskSpace = 0d;
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+ Range[] allRanges = getAllRanges(tokens);
+ Arrays.sort(allRanges);
+ /* Mapping from Range to its ordered position on the ring */
+ Map<Range, Integer> rangeIndex = new HashMap<Range, Integer>();
+ for ( int i = 0; i < allRanges.length; ++i )
+ {
+ rangeIndex.put(allRanges[i], i);
+ }
+ /* Get the coefficients for the equations */
+ List<double[]> equations = new ArrayList<double[]>();
+ /* Get the endpoint to range map */
+ Map<EndPoint, List<Range>> endPointToRangesMap = constructEndPointToRangesMap();
+ Set<EndPoint> eps = endPointToRangesMap.keySet();
+
+ for ( EndPoint ep : eps )
+ {
+ List<Range> ranges = endPointToRangesMap.get(ep);
+ double[] equation = new double[allRanges.length];
+ for ( Range range : ranges )
+ {
+ int index = rangeIndex.get(range);
+ equation[index] = 1;
+ }
+ equations.add(equation);
+ }
+ double[][] coefficients = equations.toArray( new double[0][0] );
+
+ /* Get the constants which are the aggregate disk space for each endpoint */
+ double[] constants = new double[allRanges.length];
+ int index = 0;
+ for ( EndPoint ep : eps )
+ {
+ /* reset the port back to control port */
+ ep.setPort(DatabaseDescriptor.getControlPort());
+ String lInfo = null;
+ if ( ep.equals(StorageService.udpAddr_) )
+ lInfo = getLoadInfo();
+ else
+ lInfo = getLoadInfo(ep);
+ LoadInfo li = new LoadInfo(lInfo);
+ constants[index++] = FileUtils.stringToFileSize(li.diskSpace());
+ }
+
+ RealMatrix matrix = new RealMatrixImpl(coefficients);
+ double[] solutions = matrix.solve(constants);
+ Range primaryRange = getPrimaryRangeForEndPoint(target);
+ primaryDiskSpace = solutions[rangeIndex.get(primaryRange)];
+ return primaryDiskSpace;
+ }
+
+ /**
+ * This is very dangerous. This is used only on the client
+ * side to set up the client library. This is then used to
+ * find the appropriate nodes to route the key to.
+ */
+ public void setTokenMetadata(TokenMetadata tokenMetadata)
+ {
+ tokenMetadata_ = tokenMetadata;
+ }
/**
* Called when there is a change in application state. In particular
@@ -662,9 +799,9 @@
ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
if (nodeIdState != null)
{
- Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
+ BigInteger newToken = new BigInteger(nodeIdState.getState());
logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
- Token oldToken = tokenMetadata_.getToken(ep);
+ BigInteger oldToken = tokenMetadata_.getToken(ep);
if ( oldToken != null )
{
@@ -722,6 +859,17 @@
}
}
+ public static BigInteger generateRandomToken()
+ {
+ byte[] randomBytes = new byte[24];
+ Random random = new Random();
+ for ( int i = 0 ; i < 24 ; i++)
+ {
+ randomBytes[i] = (byte)(31 + random.nextInt(256 - 31));
+ }
+ return hash(new String(randomBytes));
+ }
+
/**
* Get the count of primary keys from the sampler.
*/
@@ -741,12 +889,43 @@
LoadInfo li = storageLoadBalancer_.getLoad(ep);
return ( li == null ) ? "N/A" : li.toString();
}
+
+ /**
+ * Get the endpoint that has the largest primary count.
+ * @return
+ */
+ EndPoint getEndPointWithLargestPrimaryCount()
+ {
+ Set<EndPoint> allMbrs = Gossiper.instance().getAllMembers();
+ Map<LoadInfo, EndPoint> loadInfoToEndPointMap = new HashMap<LoadInfo, EndPoint>();
+ List<LoadInfo> lInfos = new ArrayList<LoadInfo>();
+
+ for ( EndPoint mbr : allMbrs )
+ {
+ mbr.setPort(DatabaseDescriptor.getStoragePort());
+ LoadInfo li = null;
+ if ( mbr.equals(StorageService.tcpAddr_) )
+ {
+ li = new LoadInfo( getLoadInfo() );
+ lInfos.add( li );
+ }
+ else
+ {
+ li = storageLoadBalancer_.getLoad(mbr);
+ lInfos.add( li );
+ }
+ loadInfoToEndPointMap.put(li, mbr);
+ }
+
+ Collections.sort(lInfos, new LoadInfo.DiskSpaceComparator());
+ return loadInfoToEndPointMap.get( lInfos.get(lInfos.size() - 1) );
+ }
/*
* This method updates the token on disk and modifies the cached
* StorageMetadata instance. This is only for the local endpoint.
*/
- public void updateToken(Token token) throws IOException
+ public void updateToken(BigInteger token) throws IOException
{
/* update the token on disk */
SystemTable.openSystemTable(SystemTable.name_).updateToken(token);
@@ -787,12 +966,13 @@
{
if ( keys.length > 0 )
{
- Token token = tokenMetadata_.getToken(StorageService.tcpAddr_);
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Token[] tokens = tokenToEndPointMap.keySet().toArray(new Token[tokenToEndPointMap.keySet().size()]);
+ isLoadState_ = true;
+ BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ BigInteger[] tokens = tokenToEndPointMap.keySet().toArray( new BigInteger[0] );
Arrays.sort(tokens);
int index = Arrays.binarySearch(tokens, token) * (keys.length/tokens.length);
- Token newToken = token( keys[index] );
+ BigInteger newToken = hash( keys[index] );
/* update the token */
updateToken(newToken);
}
@@ -804,6 +984,7 @@
*/
public void resetLoadState()
{
+ isLoadState_ = false;
}
/**
@@ -830,7 +1011,7 @@
}
String[] allNodes = nodesToLoad.split(":");
EndPoint[] endpoints = new EndPoint[allNodes.length];
- Token[] tokens = new Token[allNodes.length];
+ BigInteger[] tokens = new BigInteger[allNodes.length];
for ( int i = 0; i < allNodes.length; ++i )
{
@@ -866,8 +1047,8 @@
switch ( mode )
{
case FULL:
- Token token = tokenMetadata_.getToken(endpoint);
- bootStrapper_.submit(new BootStrapper(new EndPoint[]{endpoint}, token));
+ BigInteger token = tokenMetadata_.getToken(endpoint);
+ bootStrapper_.submit( new BootStrapper(new EndPoint[]{endpoint}, new BigInteger[]{token}) );
break;
case HINT:
@@ -885,14 +1066,26 @@
public String getToken(EndPoint ep)
{
EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getStoragePort());
- Token token = tokenMetadata_.getToken(ep2);
- return ( token == null ) ? "" : token.toString();
+ BigInteger token = tokenMetadata_.getToken(ep2);
+ return ( token == null ) ? BigInteger.ZERO.toString() : token.toString();
}
public String getToken()
{
return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
}
+
+ public void updateToken(String token)
+ {
+ try
+ {
+ updateToken(new BigInteger(token));
+ }
+ catch ( IOException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ }
public String getLiveNodes()
{
@@ -921,6 +1114,12 @@
doBootstrap(nodes);
}
+ public String getAppropriateToken(int count)
+ {
+ BigInteger token = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(count);
+ return token.toString();
+ }
+
public void doGC()
{
List<String> tables = DatabaseDescriptor.getTables();
@@ -975,9 +1174,9 @@
*/
EndPoint getPredecessor(EndPoint ep)
{
- Token token = tokenMetadata_.getToken(ep);
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+ BigInteger token = tokenMetadata_.getToken(ep);
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
EndPoint predecessor = (index == 0) ? tokenToEndPointMap.get(tokens
@@ -992,9 +1191,9 @@
*/
public EndPoint getSuccessor(EndPoint ep)
{
- Token token = tokenMetadata_.getToken(ep);
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+ BigInteger token = tokenMetadata_.getToken(ep);
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
EndPoint successor = (index == (tokens.size() - 1)) ? tokenToEndPointMap
@@ -1004,15 +1203,44 @@
}
/**
+ * This method returns the range handled by this node.
+ */
+ public Range getMyRange()
+ {
+ BigInteger myToken = tokenMetadata_.getToken(StorageService.tcpAddr_);
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> allTokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Collections.sort(allTokens);
+ int index = Collections.binarySearch(allTokens, myToken);
+ /* Calculate the lhs for the range */
+ BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
+ return new Range( lhs, myToken );
+ }
+
+ /**
+ * Get the primary for the given range. Use the replica placement
+ * strategies to determine which are the replicas. The first replica
+ * in the list is the primary.
+ *
+ * @param range on the ring.
+ * @return endpoint responsible for the range.
+ */
+ public EndPoint getPrimaryStorageEndPointForRange(Range range)
+ {
+ EndPoint[] replicas = nodePicker_.getStorageEndPoints(range.left());
+ return replicas[0];
+ }
+
+ /**
* Get the primary range for the specified endpoint.
* @param ep endpoint we are interested in.
* @return range for the specified endpoint.
*/
public Range getPrimaryRangeForEndPoint(EndPoint ep)
{
- Token right = tokenMetadata_.getToken(ep);
+ BigInteger right = tokenMetadata_.getToken(ep);
EndPoint predecessor = getPredecessor(ep);
- Token left = tokenMetadata_.getToken(predecessor);
+ BigInteger left = tokenMetadata_.getToken(predecessor);
return new Range(left, right);
}
@@ -1036,17 +1264,28 @@
return ranges;
}
-
+
+ /**
+ * Get all ranges that span the ring as per
+ * current snapshot of the token distribution.
+ * @return all ranges in sorted order.
+ */
+ public Range[] getAllRanges()
+ {
+ Set<BigInteger> allTokens = tokenMetadata_.cloneTokenEndPointMap().keySet();
+ return getAllRanges( allTokens );
+ }
+
/**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
* ranges.
* @return ranges in sorted order
*/
- public Range[] getAllRanges(Set<Token> tokens)
+ public Range[] getAllRanges(Set<BigInteger> tokens)
{
List<Range> ranges = new ArrayList<Range>();
- List<Token> allTokens = new ArrayList<Token>(tokens);
+ List<BigInteger> allTokens = new ArrayList<BigInteger>(tokens);
Collections.sort(allTokens);
int size = allTokens.size();
for ( int i = 1; i < size; ++i )
@@ -1060,6 +1299,20 @@
}
/**
+ * Get all ranges that span the ring given a set
+ * of endpoints.
+ */
+ public Range[] getPrimaryRangesForEndPoints(Set<EndPoint> endpoints)
+ {
+ List<Range> allRanges = new ArrayList<Range>();
+ for ( EndPoint endpoint : endpoints )
+ {
+ allRanges.add( getPrimaryRangeForEndPoint( endpoint) );
+ }
+ return allRanges.toArray(new Range[0]);
+ }
+
+ /**
* This method returns the endpoint that is responsible for storing the
* specified key.
*
@@ -1069,9 +1322,9 @@
public EndPoint getPrimary(String key)
{
EndPoint endpoint = StorageService.tcpAddr_;
- Token token = token(key);
- Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+ BigInteger token = hash(key);
+ Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
if (tokens.size() > 0)
{
Collections.sort(tokens);
@@ -1107,7 +1360,63 @@
EndPoint endpoint = getPrimary(key);
return StorageService.tcpAddr_.equals(endpoint);
}
-
+
+ /**
+ * This method determines whether the target endpoint is the
+ * primary for the given key.
+ * @param key
+ * @param target the target enpoint
+ * @return true if the local endpoint is the primary replica.
+ */
+ public boolean isPrimary(String key, EndPoint target)
+ {
+ EndPoint endpoint = getPrimary(key);
+ return target.equals(endpoint);
+ }
+
+ /**
+ * This method determines whether the local endpoint is the
+ * seondary replica for the given key.
+ * @param key
+ * @return true if the local endpoint is the secondary replica.
+ */
+ public boolean isSecondary(String key)
+ {
+ EndPoint[] topN = getNStorageEndPoint(key);
+ if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
+ return false;
+ return topN[1].equals(StorageService.tcpAddr_);
+ }
+
+ /**
+ * This method determines whether the local endpoint is the
+ * seondary replica for the given key.
+ * @param key
+ * @return true if the local endpoint is the tertiary replica.
+ */
+ public boolean isTertiary(String key)
+ {
+ EndPoint[] topN = getNStorageEndPoint(key);
+ if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
+ return false;
+ return topN[2].equals(StorageService.tcpAddr_);
+ }
+
+ /**
+ * This method determines if the local endpoint is
+ * in the topN of N nodes passed in.
+ */
+ public boolean isInTopN(String key)
+ {
+ EndPoint[] topN = getNStorageEndPoint(key);
+ for ( EndPoint ep : topN )
+ {
+ if ( ep.equals( StorageService.tcpAddr_ ) )
+ return true;
+ }
+ return false;
+ }
+
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -1117,11 +1426,16 @@
*/
public EndPoint[] getNStorageEndPoint(String key)
{
- Token token = token(key);
+ BigInteger token = hash(key);
return nodePicker_.getStorageEndPoints(token);
}
-
-
+
+ private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
+ {
+ return nodePicker_.getStorageEndPoints(keys);
+ }
+
+
/**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -1152,7 +1466,22 @@
*/
public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
{
- Token token = token(key);
+ BigInteger token = hash(key);
+ return nodePicker_.getHintedStorageEndPoints(token);
+ }
+
+ /**
+ * This method returns the N endpoints that are responsible for storing the
+ * specified key i.e for replication. But it makes sure that the N endpoints
+ * that are returned are live as reported by the FD. It returns the hint information
+ * if some nodes in the top N are not live.
+ *
+ * param @ key - key for which we need to find the endpoint return value -
+ * the endpoint responsible for this key
+ */
+ public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(String key)
+ {
+ BigInteger token = hash(key);
return nodePicker_.getHintedStorageEndPoints(token);
}
@@ -1162,7 +1491,7 @@
*
* param @ token - position on the ring
*/
- public EndPoint[] getNStorageEndPoint(Token token)
+ public EndPoint[] getNStorageEndPoint(BigInteger token)
{
return nodePicker_.getStorageEndPoints(token);
}
@@ -1175,12 +1504,25 @@
* param @ token - position on the ring
* param @ tokens - w/o the following tokens in the token list
*/
- protected EndPoint[] getNStorageEndPoint(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ protected EndPoint[] getNStorageEndPoint(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
{
return nodePicker_.getStorageEndPoints(token, tokenToEndPointMap);
}
/**
+ * This method returns the N endpoints that are responsible for storing the
+ * specified key i.e for replication. But it makes sure that the N endpoints
+ * that are returned are live as reported by the FD. It returns the hint information
+ * if some nodes in the top N are not live.
+ *
+ * param @ token - position on the ring
+ */
+ public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(BigInteger token)
+ {
+ return nodePicker_.getHintedStorageEndPoints(token);
+ }
+
+ /**
* This function finds the most suitable endpoint given a key.
* It checks for loclity and alive test.
*/
@@ -1217,5 +1559,58 @@
}
return null;
}
-
+
+ public Map<String, EndPoint> findSuitableEndPoints(String[] keys) throws IOException
+ {
+ Map<String, EndPoint> suitableEndPoints = new HashMap<String, EndPoint>();
+ Map<String, EndPoint[]> results = getNStorageEndPoints(keys);
+ for ( String key : keys )
+ {
+ EndPoint[] endpoints = results.get(key);
+ /* indicates if we have to move on to the next key */
+ boolean moveOn = false;
+ for(EndPoint endPoint: endpoints)
+ {
+ if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+ {
+ suitableEndPoints.put(key, endPoint);
+ moveOn = true;
+ break;
+ }
+ }
+
+ if ( moveOn )
+ continue;
+
+ int j = 0;
+ for ( ; j < endpoints.length; ++j )
+ {
+ if ( StorageService.instance().isInSameDataCenter(endpoints[j]) && FailureDetector.instance().isAlive(endpoints[j]) )
+ {
+ logger_.debug("EndPoint " + endpoints[j] + " is in the same data center as local storage endpoint.");
+ suitableEndPoints.put(key, endpoints[j]);
+ moveOn = true;
+ break;
+ }
+ }
+
+ if ( moveOn )
+ continue;
+
+ // We have tried to be really nice but looks like theer are no servers
+ // in the local data center that are alive and can service this request so
+ // just send it to the first alive guy and see if we get anything.
+ j = 0;
+ for ( ; j < endpoints.length; ++j )
+ {
+ if ( FailureDetector.instance().isAlive(endpoints[j]) )
+ {
+ logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
+ suitableEndPoints.put(key, endpoints[j]);
+ break;
+ }
+ }
+ }
+ return suitableEndPoints;
+ }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java Fri Mar 27 05:41:59 2009
@@ -19,6 +19,9 @@
package org.apache.cassandra.service;
import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.cassandra.net.EndPoint;
/**
@@ -44,10 +47,24 @@
public void loadAll(String nodes);
/**
+ * This method is used only for debug purpose.
+ */
+ public void updateToken(String token);
+
+ /**
*
*/
public void doGC();
-
+
+ /**
+ * Get the token such that the range of this node
+ * is split after <i>count</i> number of keys.
+ * @param count number of keys after which to generate
+ * token.
+ * @return appropriate token
+ */
+ public String getAppropriateToken(int count);
+
/**
* Stream the files in the bootstrap directory over to the
* node being bootstrapped. This is used in case of normal
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java Fri Mar 27 05:41:59 2009
@@ -19,13 +19,12 @@
package org.apache.cassandra.service;
import java.io.IOException;
+import java.math.BigInteger;
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -38,7 +37,7 @@
public void doVerb(Message message)
{
byte[] body = (byte[])message.getMessageBody()[0];
- Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
+ BigInteger token = new BigInteger(body);
try
{
logger_.info("Updating the token to [" + token + "]");
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java Fri Mar 27 05:41:59 2009
@@ -20,7 +20,7 @@
import java.util.List;
-import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.db.WriteResponseMessage;
import org.apache.cassandra.net.Message;
import org.apache.log4j.Logger;
@@ -47,11 +47,11 @@
boolean returnValue = false;
for (Message response : responses) {
Object[] body = response.getMessageBody();
- WriteResponse writeResponse = (WriteResponse) body[0];
- boolean result = writeResponse.isSuccess();
+ WriteResponseMessage writeResponseMessage = (WriteResponseMessage) body[0];
+ boolean result = writeResponseMessage.isSuccess();
if (!result) {
logger_.debug("Write at " + response.getFrom()
- + " may have failed for the key " + writeResponse.key());
+ + " may have failed for the key " + writeResponseMessage.key());
}
returnValue |= result;
}