You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:29 UTC
svn commit: r758998 [2/2] - in
/incubator/cassandra/trunk/src/org/apache/cassandra: db/ dht/ io/ locator/
service/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:28 2009
@@ -28,7 +28,6 @@
import java.io.*;
import java.lang.management.ManagementFactory;
import java.math.BigInteger;
-import java.net.UnknownHostException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -62,15 +61,12 @@
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.locator.EndPointSnitch;
import org.apache.cassandra.locator.IEndPointSnitch;
import org.apache.cassandra.locator.IReplicaPlacementStrategy;
import org.apache.cassandra.locator.RackAwareStrategy;
import org.apache.cassandra.locator.RackUnawareStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -84,11 +80,7 @@
import org.apache.commons.math.linear.RealMatrix;
import org.apache.commons.math.linear.RealMatrixImpl;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.net.io.*;
-import org.apache.cassandra.gms.*;
-import org.apache.cassandra.utils.*;
+
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -96,7 +88,6 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.WatcherEvent;
/*
* This abstraction contains the token/identifier of this node
@@ -133,11 +124,6 @@
public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
- public final static String jobConfigurationVerbHandler_ = "JOB-CONFIGURATION-VERB-HANDLER";
- public final static String taskMetricVerbHandler_ = "TASK-METRIC-VERB-HANDLER";
- public final static String mapAssignmentVerbHandler_ = "MAP-ASSIGNMENT-VERB-HANDLER";
- public final static String reduceAssignmentVerbHandler_ = "REDUCE-ASSIGNMENT-VERB-HANDLER";
- public final static String mapCompletionVerbHandler_ = "MAP-COMPLETION-VERB-HANDLER";
public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
@@ -168,11 +154,6 @@
{
return "http://" + tcpAddr_.getHost() + ":" + DatabaseDescriptor.getHttpPort();
}
-
- public static PartitionerType getPartitionerType()
- {
- return (DatabaseDescriptor.ophf_.equalsIgnoreCase(DatabaseDescriptor.getHashingStrategy())) ? PartitionerType.OPHF : PartitionerType.RANDOM;
- }
/**
* This is a facade for the hashing
@@ -253,8 +234,6 @@
*
*/
private IEndPointSnitch endPointSnitch_;
- /* Uptime of this node - we use this to determine if a bootstrap can be performed by this node */
- private long uptime_ = 0L;
/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata_ = new TokenMetadata();
@@ -265,11 +244,6 @@
* for a clean exit.
*/
private Set<IComponentShutdown> components_ = new HashSet<IComponentShutdown>();
- /*
- * This boolean indicates if we are in loading state. If we are then we do not want any
- * distributed algorithms w.r.t change in token state to kick in.
- */
- private boolean isLoadState_ = false;
/* Timer is used to disseminate load information */
private Timer loadTimer_ = new Timer(false);
@@ -316,7 +290,6 @@
public StorageService() throws Throwable
{
init();
- uptime_ = System.currentTimeMillis();
storageLoadBalancer_ = new StorageLoadBalancer(this);
endPointSnitch_ = new EndPointSnitch();
@@ -331,8 +304,6 @@
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
MessagingService.getMessagingInstance().registerVerbHandlers(HttpConnection.httpRequestVerbHandler_, new HttpRequestVerbHandler(this) );
- MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenInfoVerbHandler_, new TokenInfoVerbHandler() );
- MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.locationInfoVerbHandler_, new LocationInfoVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_, new DataFileVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );
@@ -597,42 +568,6 @@
consistencyManager_.submit(consistencySentinel);
}
- /*
- * This method displays all the ranges and the replicas
- * that are responsible for the individual ranges. The
- * format of this string is the following:
- *
- * R1 : A B C
- * R2 : D E F
- * R3 : G H I
- */
- public String showTheRing()
- {
- StringBuilder sb = new StringBuilder();
- /* Get the token to endpoint map. */
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Set<BigInteger> tokens = tokenToEndPointMap.keySet();
- /* All the ranges for the tokens */
- Range[] ranges = getAllRanges(tokens);
- Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
-
- Set<Range> rangeSet = oldRangeToEndPointMap.keySet();
- for ( Range range : rangeSet )
- {
- sb.append(range);
- sb.append(" : ");
-
- List<EndPoint> replicas = oldRangeToEndPointMap.get(range);
- for ( EndPoint replica : replicas )
- {
- sb.append(replica);
- sb.append(" ");
- }
- sb.append(System.getProperty("line.separator"));
- }
- return sb.toString();
- }
-
public Map<Range, List<EndPoint>> getRangeToEndPointMap()
{
/* Get the token to endpoint map. */
@@ -700,77 +635,6 @@
}
return endPointToRangesMap;
}
-
- /**
- * Get the estimated disk space of the target endpoint in its
- * primary range.
- * @param target whose primary range we are interested in.
- * @return disk space of the target in the primary range.
- */
- private double getDiskSpaceForPrimaryRange(EndPoint target)
- {
- double primaryDiskSpace = 0d;
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Set<BigInteger> tokens = tokenToEndPointMap.keySet();
- Range[] allRanges = getAllRanges(tokens);
- Arrays.sort(allRanges);
- /* Mapping from Range to its ordered position on the ring */
- Map<Range, Integer> rangeIndex = new HashMap<Range, Integer>();
- for ( int i = 0; i < allRanges.length; ++i )
- {
- rangeIndex.put(allRanges[i], i);
- }
- /* Get the coefficients for the equations */
- List<double[]> equations = new ArrayList<double[]>();
- /* Get the endpoint to range map */
- Map<EndPoint, List<Range>> endPointToRangesMap = constructEndPointToRangesMap();
- Set<EndPoint> eps = endPointToRangesMap.keySet();
-
- for ( EndPoint ep : eps )
- {
- List<Range> ranges = endPointToRangesMap.get(ep);
- double[] equation = new double[allRanges.length];
- for ( Range range : ranges )
- {
- int index = rangeIndex.get(range);
- equation[index] = 1;
- }
- equations.add(equation);
- }
- double[][] coefficients = equations.toArray( new double[0][0] );
-
- /* Get the constants which are the aggregate disk space for each endpoint */
- double[] constants = new double[allRanges.length];
- int index = 0;
- for ( EndPoint ep : eps )
- {
- /* reset the port back to control port */
- ep.setPort(DatabaseDescriptor.getControlPort());
- String lInfo = null;
- if ( ep.equals(StorageService.udpAddr_) )
- lInfo = getLoadInfo();
- else
- lInfo = getLoadInfo(ep);
- LoadInfo li = new LoadInfo(lInfo);
- constants[index++] = FileUtils.stringToFileSize(li.diskSpace());
- }
-
- RealMatrix matrix = new RealMatrixImpl(coefficients);
- double[] solutions = matrix.solve(constants);
- Range primaryRange = getPrimaryRangeForEndPoint(target);
- primaryDiskSpace = solutions[rangeIndex.get(primaryRange)];
- return primaryDiskSpace;
- }
-
- /**
- * This is very dangerous. This is used only on the client
- * side to set up the client library. This is then used to
- * find the appropriate nodes to route the key to.
- */
- public void setTokenMetadata(TokenMetadata tokenMetadata)
- {
- tokenMetadata_ = tokenMetadata;
- }
/**
* Called when there is a change in application state. In particular
@@ -844,17 +708,6 @@
}
}
- public static BigInteger generateRandomToken()
- {
- byte[] randomBytes = new byte[24];
- Random random = new Random();
- for ( int i = 0 ; i < 24 ; i++)
- {
- randomBytes[i] = (byte)(31 + random.nextInt(256 - 31));
- }
- return hash(new String(randomBytes));
- }
-
/**
* Get the count of primary keys from the sampler.
*/
@@ -874,37 +727,6 @@
LoadInfo li = storageLoadBalancer_.getLoad(ep);
return ( li == null ) ? "N/A" : li.toString();
}
-
- /**
- * Get the endpoint that has the largest primary count.
- * @return
- */
- EndPoint getEndPointWithLargestPrimaryCount()
- {
- Set<EndPoint> allMbrs = Gossiper.instance().getAllMembers();
- Map<LoadInfo, EndPoint> loadInfoToEndPointMap = new HashMap<LoadInfo, EndPoint>();
- List<LoadInfo> lInfos = new ArrayList<LoadInfo>();
-
- for ( EndPoint mbr : allMbrs )
- {
- mbr.setPort(DatabaseDescriptor.getStoragePort());
- LoadInfo li = null;
- if ( mbr.equals(StorageService.tcpAddr_) )
- {
- li = new LoadInfo( getLoadInfo() );
- lInfos.add( li );
- }
- else
- {
- li = storageLoadBalancer_.getLoad(mbr);
- lInfos.add( li );
- }
- loadInfoToEndPointMap.put(li, mbr);
- }
-
- Collections.sort(lInfos, new LoadInfo.DiskSpaceComparator());
- return loadInfoToEndPointMap.get( lInfos.get(lInfos.size() - 1) );
- }
/*
* This method updates the token on disk and modifies the cached
@@ -951,8 +773,7 @@
{
if ( keys.length > 0 )
{
- isLoadState_ = true;
- BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
+ BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
BigInteger[] tokens = tokenToEndPointMap.keySet().toArray( new BigInteger[0] );
Arrays.sort(tokens);
@@ -969,7 +790,6 @@
*/
public void resetLoadState()
{
- isLoadState_ = false;
}
/**
@@ -1182,35 +1002,6 @@
}
/**
- * This method returns the range handled by this node.
- */
- public Range getMyRange()
- {
- BigInteger myToken = tokenMetadata_.getToken(StorageService.tcpAddr_);
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List<BigInteger> allTokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
- Collections.sort(allTokens);
- int index = Collections.binarySearch(allTokens, myToken);
- /* Calculate the lhs for the range */
- BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
- return new Range( lhs, myToken );
- }
-
- /**
- * Get the primary for the given range. Use the replica placement
- * strategies to determine which are the replicas. The first replica
- * in the list is the primary.
- *
- * @param range on the ring.
- * @return endpoint responsible for the range.
- */
- public EndPoint getPrimaryStorageEndPointForRange(Range range)
- {
- EndPoint[] replicas = nodePicker_.getStorageEndPoints(range.left());
- return replicas[0];
- }
-
- /**
* Get the primary range for the specified endpoint.
* @param ep endpoint we are interested in.
* @return range for the specified endpoint.
@@ -1243,18 +1034,7 @@
return ranges;
}
-
- /**
- * Get all ranges that span the ring as per
- * current snapshot of the token distribution.
- * @return all ranges in sorted order.
- */
- public Range[] getAllRanges()
- {
- Set<BigInteger> allTokens = tokenMetadata_.cloneTokenEndPointMap().keySet();
- return getAllRanges( allTokens );
- }
-
+
/**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
@@ -1278,20 +1058,6 @@
}
/**
- * Get all ranges that span the ring given a set
- * of endpoints.
- */
- public Range[] getPrimaryRangesForEndPoints(Set<EndPoint> endpoints)
- {
- List<Range> allRanges = new ArrayList<Range>();
- for ( EndPoint endpoint : endpoints )
- {
- allRanges.add( getPrimaryRangeForEndPoint( endpoint) );
- }
- return allRanges.toArray(new Range[0]);
- }
-
- /**
* This method returns the endpoint that is responsible for storing the
* specified key.
*
@@ -1339,63 +1105,7 @@
EndPoint endpoint = getPrimary(key);
return StorageService.tcpAddr_.equals(endpoint);
}
-
- /**
- * This method determines whether the target endpoint is the
- * primary for the given key.
- * @param key
- * @param target the target enpoint
- * @return true if the local endpoint is the primary replica.
- */
- public boolean isPrimary(String key, EndPoint target)
- {
- EndPoint endpoint = getPrimary(key);
- return target.equals(endpoint);
- }
-
- /**
- * This method determines whether the local endpoint is the
- * seondary replica for the given key.
- * @param key
- * @return true if the local endpoint is the secondary replica.
- */
- public boolean isSecondary(String key)
- {
- EndPoint[] topN = getNStorageEndPoint(key);
- if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
- return false;
- return topN[1].equals(StorageService.tcpAddr_);
- }
-
- /**
- * This method determines whether the local endpoint is the
- * seondary replica for the given key.
- * @param key
- * @return true if the local endpoint is the tertiary replica.
- */
- public boolean isTertiary(String key)
- {
- EndPoint[] topN = getNStorageEndPoint(key);
- if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
- return false;
- return topN[2].equals(StorageService.tcpAddr_);
- }
-
- /**
- * This method determines if the local endpoint is
- * in the topN of N nodes passed in.
- */
- public boolean isInTopN(String key)
- {
- EndPoint[] topN = getNStorageEndPoint(key);
- for ( EndPoint ep : topN )
- {
- if ( ep.equals( StorageService.tcpAddr_ ) )
- return true;
- }
- return false;
- }
-
+
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -1408,13 +1118,8 @@
BigInteger token = hash(key);
return nodePicker_.getStorageEndPoints(token);
}
-
- private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
- {
- return nodePicker_.getStorageEndPoints(keys);
- }
-
-
+
+
/**
* This method attempts to return N endpoints that are responsible for storing the
* specified key i.e for replication.
@@ -1451,21 +1156,6 @@
/**
* This method returns the N endpoints that are responsible for storing the
- * specified key i.e for replication. But it makes sure that the N endpoints
- * that are returned are live as reported by the FD. It returns the hint information
- * if some nodes in the top N are not live.
- *
- * param @ key - key for which we need to find the endpoint return value -
- * the endpoint responsible for this key
- */
- public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(String key)
- {
- BigInteger token = hash(key);
- return nodePicker_.getHintedStorageEndPoints(token);
- }
-
- /**
- * This method returns the N endpoints that are responsible for storing the
* specified token i.e for replication.
*
* param @ token - position on the ring
@@ -1489,19 +1179,6 @@
}
/**
- * This method returns the N endpoints that are responsible for storing the
- * specified key i.e for replication. But it makes sure that the N endpoints
- * that are returned are live as reported by the FD. It returns the hint information
- * if some nodes in the top N are not live.
- *
- * param @ token - position on the ring
- */
- public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(BigInteger token)
- {
- return nodePicker_.getHintedStorageEndPoints(token);
- }
-
- /**
* This function finds the most suitable endpoint given a key.
* It checks for loclity and alive test.
*/
@@ -1538,58 +1215,5 @@
}
return null;
}
-
- public Map<String, EndPoint> findSuitableEndPoints(String[] keys) throws IOException
- {
- Map<String, EndPoint> suitableEndPoints = new HashMap<String, EndPoint>();
- Map<String, EndPoint[]> results = getNStorageEndPoints(keys);
- for ( String key : keys )
- {
- EndPoint[] endpoints = results.get(key);
- /* indicates if we have to move on to the next key */
- boolean moveOn = false;
- for(EndPoint endPoint: endpoints)
- {
- if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
- {
- suitableEndPoints.put(key, endPoint);
- moveOn = true;
- break;
- }
- }
-
- if ( moveOn )
- continue;
-
- int j = 0;
- for ( ; j < endpoints.length; ++j )
- {
- if ( StorageService.instance().isInSameDataCenter(endpoints[j]) && FailureDetector.instance().isAlive(endpoints[j]) )
- {
- logger_.debug("EndPoint " + endpoints[j] + " is in the same data center as local storage endpoint.");
- suitableEndPoints.put(key, endpoints[j]);
- moveOn = true;
- break;
- }
- }
-
- if ( moveOn )
- continue;
-
- // We have tried to be really nice but looks like theer are no servers
- // in the local data center that are alive and can service this request so
- // just send it to the first alive guy and see if we get anything.
- j = 0;
- for ( ; j < endpoints.length; ++j )
- {
- if ( FailureDetector.instance().isAlive(endpoints[j]) )
- {
- logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
- suitableEndPoints.put(key, endpoints[j]);
- break;
- }
- }
- }
- return suitableEndPoints;
- }
+
}