You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/31 15:35:47 UTC
svn commit: r809586 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/locator/
src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/tools/
test/unit/or...
Author: jbellis
Date: Mon Aug 31 13:35:46 2009
New Revision: 809586
URL: http://svn.apache.org/viewvc?rev=809586&view=rev
Log:
r/m unnecessary IReplicationStrategy interface. r/m LoadVerbHandler and unused endpoint-related code. rename AbstractStrategy -> AbstractReplicationStrategy and getEndpoint methods.
patch by jbellis; reviewed by Sandeep Tata for CASSANDRA-393
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
- copied, changed from r809216, incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/LoadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Mon Aug 31 13:35:46 2009
@@ -24,7 +24,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.IReplicaPlacementStrategy;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.Cassandra;
@@ -47,7 +47,7 @@
private Set<String> seeds_ = new HashSet<String>();
final private int port_=DatabaseDescriptor.getThriftPort();
- private volatile IReplicaPlacementStrategy nodePicker_;
+ private volatile AbstractReplicationStrategy nodePicker_;
final private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
public RingCache()
@@ -84,7 +84,7 @@
Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class, int.class};
try
{
- nodePicker_ = (IReplicaPlacementStrategy) cls.getConstructor(parameterTypes).newInstance(tokenMetadata, partitioner_, DatabaseDescriptor.getReplicationFactor(), port_);
+ nodePicker_ = (AbstractReplicationStrategy) cls.getConstructor(parameterTypes).newInstance(tokenMetadata, partitioner_, DatabaseDescriptor.getReplicationFactor(), port_);
}
catch (Exception e)
{
@@ -102,6 +102,6 @@
public EndPoint[] getEndPoint(String key)
{
- return nodePicker_.getStorageEndPoints(partitioner_.getToken(key));
+ return nodePicker_.getReadStorageEndPoints(partitioner_.getToken(key));
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Aug 31 13:35:46 2009
@@ -29,8 +29,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.UnavailableException;
-import org.apache.cassandra.utils.LogUtil;
+
import org.apache.log4j.Logger;
public class ReadVerbHandler implements IVerbHandler
@@ -115,7 +114,7 @@
private void doReadRepair(Row row, ReadCommand readCommand)
{
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
+ List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(readCommand.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove( StorageService.getLocalStorageEndPoint() );
Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (from r809216, incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java&r1=809216&r2=809586&rev=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon Aug 31 13:35:46 2009
@@ -37,16 +37,16 @@
* all abstraction that implement the IReplicaPlacementStrategy
* interface.
*/
-public abstract class AbstractStrategy implements IReplicaPlacementStrategy
+public abstract class AbstractReplicationStrategy
{
- protected static final Logger logger_ = Logger.getLogger(AbstractStrategy.class);
+ protected static final Logger logger_ = Logger.getLogger(AbstractReplicationStrategy.class);
protected TokenMetadata tokenMetadata_;
protected IPartitioner partitioner_;
protected int replicas_;
protected int storagePort_;
- AbstractStrategy(TokenMetadata tokenMetadata, IPartitioner partitioner, int replicas, int storagePort)
+ AbstractReplicationStrategy(TokenMetadata tokenMetadata, IPartitioner partitioner, int replicas, int storagePort)
{
tokenMetadata_ = tokenMetadata;
partitioner_ = partitioner;
@@ -54,6 +54,21 @@
storagePort_ = storagePort;
}
+ public abstract EndPoint[] getWriteStorageEndPoints(Token token);
+ public abstract EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
+ public abstract EndPoint[] getReadStorageEndPoints(Token token);
+
+ /*
+ * This method returns the hint map. The key is the endpoint
+ * on which the data is being placed and the value is the
+ * endpoint which is in the top N.
+ * Get the map of top N to the live nodes currently.
+ */
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
+ {
+ return getHintedMapForEndpoints(getWriteStorageEndPoints(token));
+ }
+
/*
* This method changes the ports of the endpoints from
* the control port to the storage ports.
@@ -94,18 +109,6 @@
return endPoint;
}
- /*
- * This method returns the hint map. The key is the endpoint
- * on which the data is being placed and the value is the
- * endpoint which is in the top N.
- * Get the map of top N to the live nodes currently.
- */
- public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
- {
- EndPoint[] topN = getStorageEndPointsForWrite( token );
- return getHintedMapForEndpoints(topN);
- }
-
private Map<EndPoint, EndPoint> getHintedMapForEndpoints(EndPoint[] topN)
{
List<EndPoint> liveList = new ArrayList<EndPoint>();
@@ -135,6 +138,4 @@
}
return map;
}
-
- public abstract EndPoint[] getStorageEndPoints(Token token);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon Aug 31 13:35:46 2009
@@ -21,7 +21,6 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,14 +37,14 @@
* a node in a different rack in the same datacenter as
* the primary.
*/
-public class RackAwareStrategy extends AbstractStrategy
+public class RackAwareStrategy extends AbstractReplicationStrategy
{
public RackAwareStrategy(TokenMetadata tokenMetadata, IPartitioner partitioner, int replicas, int storagePort)
{
super(tokenMetadata, partitioner, replicas, storagePort);
}
- public EndPoint[] getStorageEndPoints(Token token)
+ public EndPoint[] getReadStorageEndPoints(Token token)
{
int startIndex;
List<EndPoint> list = new ArrayList<EndPoint>();
@@ -123,24 +122,12 @@
return list.toArray(new EndPoint[list.size()]);
}
- public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
- {
- Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
-
- for ( String key : keys )
- {
- results.put(key, getStorageEndPoints(partitioner_.getToken(key)));
- }
-
- return results;
- }
-
- public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
{
throw new UnsupportedOperationException("This operation is not currently supported");
}
- public EndPoint[] getStorageEndPointsForWrite(Token token)
+ public EndPoint[] getWriteStorageEndPoints(Token token)
{
throw new UnsupportedOperationException("Rack-aware bootstrapping not supported");
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Mon Aug 31 13:35:46 2009
@@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -34,19 +33,19 @@
* returns the 3 nodes that lie right next to each other
* on the ring.
*/
-public class RackUnawareStrategy extends AbstractStrategy
+public class RackUnawareStrategy extends AbstractReplicationStrategy
{
public RackUnawareStrategy(TokenMetadata tokenMetadata, IPartitioner partitioner, int replicas, int storagePort)
{
super(tokenMetadata, partitioner, replicas, storagePort);
}
- public EndPoint[] getStorageEndPoints(Token token)
+ public EndPoint[] getReadStorageEndPoints(Token token)
{
- return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
+ return getReadStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
}
- public EndPoint[] getStorageEndPointsForWrite(Token token)
+ public EndPoint[] getWriteStorageEndPoints(Token token)
{
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Map<Token, EndPoint> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
@@ -64,7 +63,7 @@
return list.toArray(new EndPoint[list.size()]);
}
- public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+ public EndPoint[] getReadStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
{
List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, null);
List<EndPoint> list = new ArrayList<EndPoint>();
@@ -115,15 +114,4 @@
}
return tokenList;
}
-
- public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
- {
- Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
-
- for ( String key : keys )
- {
- results.put(key, getStorageEndPoints(partitioner_.getToken(key)));
- }
- return results;
- }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Aug 31 13:35:46 2009
@@ -113,7 +113,7 @@
try
{
// (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.)
- Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key());
Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
{
@@ -148,7 +148,7 @@
}
try
{
- Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+ Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key());
int blockFor = determineBlockFor(consistency_level);
List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
@@ -294,7 +294,7 @@
for (ReadCommand command: commands)
{
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+ EndPoint[] endpoints = StorageService.instance().getReadStorageEndPoints(command.key);
boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
//TODO: Throw InvalidRequest if we're in bootstrap mode?
if (foundLocal && !StorageService.instance().isBootstrapMode())
@@ -358,7 +358,7 @@
DatabaseDescriptor.getQuorum(),
readResponseResolver);
EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
- List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
+ List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
/* Remove the local storage endpoint from the list. */
endpointList.remove(dataPoint);
EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
@@ -466,7 +466,7 @@
{
/* This is the primary */
EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
- List<EndPoint> replicas = new ArrayList<EndPoint>( StorageService.instance().getNLiveStorageEndPoint(key) );
+ List<EndPoint> replicas = new ArrayList<EndPoint>( StorageService.instance().getLiveReadStorageEndPoints(key) );
replicas.remove(dataPoint);
/* Get the messages to be sent index 0 is the data messages and index 1 is the digest message */
Message[] message = messages.get(key);
@@ -506,7 +506,7 @@
List<Row> rows = new ArrayList<Row>();
for (ReadCommand command: commands)
{
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
+ List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(command.key);
/* Remove the local storage endpoint from the list. */
endpoints.remove(StorageService.getLocalStorageEndPoint());
// TODO: throw a thrift exception if we do not have N nodes
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Aug 31 13:35:46 2009
@@ -66,7 +66,6 @@
/* All verb handler identifiers */
public final static String mutationVerbHandler_ = "ROW-MUTATION-VERB-HANDLER";
public final static String tokenVerbHandler_ = "TOKEN-VERB-HANDLER";
- public final static String loadVerbHandler_ = "LOAD-VERB-HANDLER";
public final static String binaryVerbHandler_ = "BINARY-VERB-HANDLER";
public final static String readRepairVerbHandler_ = "READ-REPAIR-VERB-HANDLER";
public final static String readVerbHandler_ = "ROW-READ-VERB-HANDLER";
@@ -78,12 +77,6 @@
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
- public static enum ConsistencyLevel
- {
- WEAK,
- STRONG
- }
-
private static StorageService instance_;
/* Used to lock the factory for creation of StorageService instance */
private static Lock createLock_ = new ReentrantLock();
@@ -104,7 +97,7 @@
public static IPartitioner getPartitioner() {
return partitioner_;
}
-
+
static
{
partitioner_ = DatabaseDescriptor.getPartitioner();
@@ -183,7 +176,7 @@
/* This is the entity that tracks load information of all nodes in the cluster */
private StorageLoadBalancer storageLoadBalancer_;
/* We use this interface to determine where replicas need to be placed */
- private IReplicaPlacementStrategy nodePicker_;
+ private AbstractReplicationStrategy nodePicker_;
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
private Set<EndPoint> bootstrapSet;
@@ -241,7 +234,6 @@
/* register the verb handlers */
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenVerbHandler_, new TokenUpdateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.binaryVerbHandler_, new BinaryVerbHandler());
- MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.loadVerbHandler_, new LoadVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mutationVerbHandler_, new RowMutationVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.readRepairVerbHandler_, new ReadRepairVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.readVerbHandler_, new ReadVerbHandler());
@@ -268,7 +260,7 @@
Class [] parameterTypes = new Class[] { TokenMetadata.class, IPartitioner.class, int.class, int.class};
try
{
- nodePicker_ = (IReplicaPlacementStrategy) cls.getConstructor(parameterTypes).newInstance(tokenMetadata_, partitioner_, DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());
+ nodePicker_ = (AbstractReplicationStrategy) cls.getConstructor(parameterTypes).newInstance(tokenMetadata_, partitioner_, DatabaseDescriptor.getReplicationFactor(), DatabaseDescriptor.getStoragePort());
}
catch (Exception e)
{
@@ -377,7 +369,7 @@
Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
for ( Range range : ranges )
{
- EndPoint[] endpoints = getNStorageEndPoint(range.right());
+ EndPoint[] endpoints = nodePicker_.getReadStorageEndPoints(range.right());
rangeToEndPointMap.put(range, new ArrayList<EndPoint>( Arrays.asList(endpoints) ) );
}
if (logger_.isDebugEnabled())
@@ -399,7 +391,7 @@
Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
for ( Range range : ranges )
{
- EndPoint[] endpoints = getNStorageEndPoint(range.right(), tokenToEndPointMap);
+ EndPoint[] endpoints = nodePicker_.getReadStorageEndPoints(range.right(), tokenToEndPointMap);
rangeToEndPointMap.put(range, new ArrayList<EndPoint>( Arrays.asList(endpoints) ) );
}
if (logger_.isDebugEnabled())
@@ -417,8 +409,7 @@
{
Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Collection<EndPoint> mbrs = tokenToEndPointMap.values();
- for ( EndPoint mbr : mbrs )
+ for (EndPoint mbr : tokenToEndPointMap.values())
{
endPointToRangesMap.put(mbr, getRangesForEndPoint(mbr));
}
@@ -952,16 +943,10 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public EndPoint[] getNStorageEndPoint(String key)
+ public EndPoint[] getReadStorageEndPoints(String key)
{
- return nodePicker_.getStorageEndPoints(partitioner_.getToken(key));
- }
-
- private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
- {
- return nodePicker_.getStorageEndPoints(keys);
- }
-
+ return nodePicker_.getReadStorageEndPoints(partitioner_.getToken(key));
+ }
/**
* This method attempts to return N endpoints that are responsible for storing the
@@ -970,10 +955,10 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public List<EndPoint> getNLiveStorageEndPoint(String key)
+ public List<EndPoint> getLiveReadStorageEndPoints(String key)
{
List<EndPoint> liveEps = new ArrayList<EndPoint>();
- EndPoint[] endpoints = getNStorageEndPoint(key);
+ EndPoint[] endpoints = getReadStorageEndPoints(key);
for ( EndPoint endpoint : endpoints )
{
@@ -991,42 +976,18 @@
* @param key - key for which we need to find the endpoint return value -
* the endpoint responsible for this key
*/
- public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
+ public Map<EndPoint, EndPoint> getHintedStorageEndpointMap(String key)
{
return nodePicker_.getHintedStorageEndPoints(partitioner_.getToken(key));
}
/**
- * This method returns the N endpoints that are responsible for storing the
- * specified token i.e for replication.
- *
- * @param token - position on the ring
- */
- public EndPoint[] getNStorageEndPoint(Token token)
- {
- return nodePicker_.getStorageEndPoints(token);
- }
-
- /**
- * This method returns the N endpoints that are responsible for storing the
- * specified token i.e for replication and are based on the token to endpoint
- * mapping that is passed in.
- *
- * @param token - position on the ring
- * @param tokens - w/o the following tokens in the token list
- */
- protected EndPoint[] getNStorageEndPoint(Token token, Map<Token, EndPoint> tokenToEndPointMap)
- {
- return nodePicker_.getStorageEndPoints(token, tokenToEndPointMap);
- }
-
- /**
* This function finds the most suitable endpoint given a key.
* It checks for locality and alive test.
*/
public EndPoint findSuitableEndPoint(String key) throws IOException
{
- EndPoint[] endpoints = getNStorageEndPoint(key);
+ EndPoint[] endpoints = getReadStorageEndPoints(key);
for(EndPoint endPoint: endpoints)
{
if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
@@ -1057,68 +1018,7 @@
}
return null;
}
-
- /*
- * TODO:
- * This is used by the incomplete multiget implementation. Need to rewrite
- * this to use findSuitableEndPoint above instead of copy/paste
- */
- public Map<String, EndPoint> findSuitableEndPoints(String[] keys) throws IOException
- {
- Map<String, EndPoint> suitableEndPoints = new HashMap<String, EndPoint>();
- Map<String, EndPoint[]> results = getNStorageEndPoints(keys);
- for ( String key : keys )
- {
- EndPoint[] endpoints = results.get(key);
- /* indicates if we have to move on to the next key */
- boolean moveOn = false;
- for(EndPoint endPoint: endpoints)
- {
- if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
- {
- suitableEndPoints.put(key, endPoint);
- moveOn = true;
- break;
- }
- }
-
- if ( moveOn )
- continue;
-
- int j = 0;
- for ( ; j < endpoints.length; ++j )
- {
- if ( StorageService.instance().isInSameDataCenter(endpoints[j]) && FailureDetector.instance().isAlive(endpoints[j]) )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("EndPoint " + endpoints[j] + " is in the same data center as local storage endpoint.");
- suitableEndPoints.put(key, endpoints[j]);
- moveOn = true;
- break;
- }
- }
-
- if ( moveOn )
- continue;
-
- // We have tried to be really nice but looks like there are no servers
- // in the local data center that are alive and can service this request so
- // just send it to the first alive guy and see if we get anything.
- j = 0;
- for ( ; j < endpoints.length; ++j )
- {
- if ( FailureDetector.instance().isAlive(endpoints[j]) )
- {
- if (logger_.isDebugEnabled())
- logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
- suitableEndPoints.put(key, endpoints[j]);
- break;
- }
- }
- }
- return suitableEndPoints;
- }
-
+
Map<Token, EndPoint> getLiveEndPointMap()
{
return tokenMetadata_.cloneTokenEndPointMap();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java Mon Aug 31 13:35:46 2009
@@ -40,7 +40,7 @@
*/
private static boolean checkIfProcessKey(String key)
{
- EndPoint[] endPoints = StorageService.instance().getNStorageEndPoint(key);
+ EndPoint[] endPoints = StorageService.instance().getReadStorageEndPoints(key);
EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
for(EndPoint endPoint : endPoints)
{
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=809586&r1=809585&r2=809586&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Mon Aug 31 13:35:46 2009
@@ -40,7 +40,7 @@
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new RandomPartitioner();
- IReplicaPlacementStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
+ AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
List<Token> endPointTokens = new ArrayList<Token>();
List<Token> keyTokens = new ArrayList<Token>();
@@ -56,7 +56,7 @@
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new OrderPreservingPartitioner();
- IReplicaPlacementStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
+ AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
List<Token> endPointTokens = new ArrayList<Token>();
List<Token> keyTokens = new ArrayList<Token>();
@@ -69,7 +69,7 @@
// given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
// make sure that the Strategy picks the right endpoints for the keys.
- private void testGetStorageEndPoints(TokenMetadata tmd, IReplicaPlacementStrategy strategy, Token[] endPointTokens, Token[] keyTokens)
+ private void testGetStorageEndPoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens)
{
List<EndPoint> hosts = new ArrayList<EndPoint>();
for (int i = 0; i < endPointTokens.length; i++)
@@ -81,7 +81,7 @@
for (int i = 0; i < keyTokens.length; i++)
{
- EndPoint[] endPoints = strategy.getStorageEndPoints(keyTokens[i]);
+ EndPoint[] endPoints = strategy.getReadStorageEndPoints(keyTokens[i]);
assertEquals(3, endPoints.length);
for (int j = 0; j < endPoints.length; j++)
{
@@ -95,7 +95,7 @@
{
TokenMetadata tmd = new TokenMetadata();
IPartitioner partitioner = new RandomPartitioner();
- IReplicaPlacementStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
+ AbstractReplicationStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
Token[] endPointTokens = new Token[5];
Token[] keyTokens = new Token[5];
@@ -121,7 +121,7 @@
for (int i = 0; i < keyTokens.length; i++)
{
- EndPoint[] endPoints = strategy.getStorageEndPointsForWrite(keyTokens[i]);
+ EndPoint[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i]);
assertTrue(endPoints.length >=3);
List<EndPoint> endPointsList = Arrays.asList(endPoints);