You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:42:00 UTC

svn commit: r759028 [2/2] - /incubator/cassandra/trunk/src/org/apache/cassandra/service/

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 05:41:59 2009
@@ -18,32 +18,20 @@
 
 package org.apache.cassandra.service;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.analytics.AnalyticsContext;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.MultiThreadedStage;
@@ -68,22 +56,21 @@
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.BootstrapMetadataVerbHandler;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.locator.EndPointSnitch;
 import org.apache.cassandra.locator.IEndPointSnitch;
 import org.apache.cassandra.locator.IReplicaPlacementStrategy;
 import org.apache.cassandra.locator.RackAwareStrategy;
 import org.apache.cassandra.locator.RackUnawareStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.CompactEndPointSerializationHelper;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -94,13 +81,22 @@
 import org.apache.cassandra.tools.TokenUpdateVerbHandler;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.commons.math.linear.RealMatrix;
+import org.apache.commons.math.linear.RealMatrixImpl;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.io.*;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.utils.*;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.WatcherEvent;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -137,10 +133,14 @@
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
     public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
     public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
+    public final static String jobConfigurationVerbHandler_ = "JOB-CONFIGURATION-VERB-HANDLER";
+    public final static String taskMetricVerbHandler_ = "TASK-METRIC-VERB-HANDLER";
+    public final static String mapAssignmentVerbHandler_ = "MAP-ASSIGNMENT-VERB-HANDLER"; 
+    public final static String reduceAssignmentVerbHandler_ = "REDUCE-ASSIGNMENT-VERB-HANDLER";
+    public final static String mapCompletionVerbHandler_ = "MAP-COMPLETION-VERB-HANDLER";
     public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
     public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
-    public static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
-
+    
     public static enum ConsistencyLevel
     {
     	WEAK,
@@ -168,19 +168,20 @@
     {
         return "http://" + tcpAddr_.getHost() + ":" + DatabaseDescriptor.getHttpPort();
     }
+    
+    public static PartitionerType getPartitionerType()
+    {
+        return (DatabaseDescriptor.getHashingStrategy().equalsIgnoreCase(DatabaseDescriptor.ophf_)) ? PartitionerType.OPHF : PartitionerType.RANDOM;
+    }
 
     /**
      * This is a facade for the hashing 
      * function used by the system for
      * partitioning.
     */
-    public static Token token(String key)
+    public static BigInteger hash(String key)
     {
-        return partitioner_.getTokenForKey(key);
-    }
-
-    public static IPartitioner getPartitioner() {
-        return partitioner_;
+        return partitioner_.hash(key);
     }
     
     public static enum BootstrapMode
@@ -248,6 +249,8 @@
      *
      */
     private IEndPointSnitch endPointSnitch_;
+    /* Uptime of this node - we use this to determine if a bootstrap can be performed by this node */
+    private long uptime_ = 0L;
 
     /* This abstraction maintains the token/endpoint metadata information */
     private TokenMetadata tokenMetadata_ = new TokenMetadata();
@@ -258,6 +261,11 @@
      * for a clean exit.
     */
     private Set<IComponentShutdown> components_ = new HashSet<IComponentShutdown>();
+    /*
+     * This boolean indicates if we are in loading state. If we are then we do not want any
+     * distributed algorithms w.r.t change in token state to kick in.
+    */
+    private boolean isLoadState_ = false;
     /* Timer is used to disseminate load information */
     private Timer loadTimer_ = new Timer(false);
 
@@ -304,6 +312,7 @@
     public StorageService() throws Throwable
     {
         init();
+        uptime_ = System.currentTimeMillis();        
         storageLoadBalancer_ = new StorageLoadBalancer(this);
         endPointSnitch_ = new EndPointSnitch();
         
@@ -318,6 +327,8 @@
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
         MessagingService.getMessagingInstance().registerVerbHandlers(HttpConnection.httpRequestVerbHandler_, new HttpRequestVerbHandler(this) );
+        MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenInfoVerbHandler_, new TokenInfoVerbHandler() );
+        MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.locationInfoVerbHandler_, new LocationInfoVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_, new DataFileVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );        
@@ -433,21 +444,23 @@
     	components_.add(component);
     }
 
-    static
+    private void initPartitioner()
     {
-        try
+        String hashingStrategy = DatabaseDescriptor.getHashingStrategy();
+        if ( hashingStrategy.equalsIgnoreCase(DatabaseDescriptor.ophf_) )
         {
-            Class cls = Class.forName(DatabaseDescriptor.getPartitionerClass());
-            partitioner_ = (IPartitioner) cls.getConstructor().newInstance();
-        }
-        catch (Exception e)
+            partitioner_ = new OrderPreservingHashPartitioner();
+        }        
+        else
         {
-            throw new RuntimeException(e);
+            partitioner_ = new RandomPartitioner();
         }
     }
     
     public void start() throws Throwable
-    {
+    {        
+    	/* Set up the partitioner */
+        initPartitioner();
         /* Start the DB */
         storageMetadata_ = DBManager.instance().start();  
         /* Set up TCP endpoint */
@@ -487,6 +500,23 @@
         Gossiper.instance().addApplicationState(StorageService.nodeId_, new ApplicationState(storageMetadata_.getStorageId().toString()));
     }
 
+    private void startMapReduceFramework()
+    {
+        // TODO: This is a null pointer exception if JobTrackerHost is not in
+        // the config file. Also, shouldn't this comparison be done by IP
+        // instead of host name?  We could have a match but not a textual
+        // match (e.g. somehost.vip vs somehost.vip.domain.com)
+        if ( DatabaseDescriptor.getJobTrackerAddress().equals( StorageService.tcpAddr_.getHost() ) ) 
+        {
+//            JobTracker.instance().start();
+//            TaskTracker.instance().start();
+        }
+        else
+        {
+//            TaskTracker.instance().start();
+        }
+    }
+    
     public void killMe() throws Throwable
     {
         isShutdown_.set(true);
@@ -539,7 +569,7 @@
     }
 
     /* TODO: remove later */
-    public void updateTokenMetadata(Token token, EndPoint endpoint)
+    public void updateTokenMetadata(BigInteger token, EndPoint endpoint)
     {
         tokenMetadata_.update(token, endpoint);
     }
@@ -582,11 +612,47 @@
 		consistencyManager_.submit(consistencySentinel);
     }
 
+    /*
+     * This method displays all the ranges and the replicas
+     * that are responsible for the individual ranges. The
+     * format of this string is the following:
+     *
+     *  R1 : A B C
+     *  R2 : D E F
+     *  R3 : G H I
+    */
+    public String showTheRing()
+    {
+        StringBuilder sb = new StringBuilder();
+        /* Get the token to endpoint map. */
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+        /* All the ranges for the tokens */
+        Range[] ranges = getAllRanges(tokens);
+        Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
+
+        Set<Range> rangeSet = oldRangeToEndPointMap.keySet();
+        for ( Range range : rangeSet )
+        {
+            sb.append(range);
+            sb.append(" : ");
+
+            List<EndPoint> replicas = oldRangeToEndPointMap.get(range);
+            for ( EndPoint replica : replicas )
+            {
+                sb.append(replica);
+                sb.append(" ");
+            }
+            sb.append(System.getProperty("line.separator"));
+        }
+        return sb.toString();
+    }
+
     public Map<Range, List<EndPoint>> getRangeToEndPointMap()
     {
         /* Get the token to endpoint map. */
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        Set<Token> tokens = tokenToEndPointMap.keySet();
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
         /* All the ranges for the tokens */
         Range[] ranges = getAllRanges(tokens);
         Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
@@ -619,7 +685,7 @@
      * @param tokenToEndPointMap mapping of token to endpoints.
      * @return mapping of ranges to the replicas responsible for them.
     */
-    public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
+    public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<BigInteger, EndPoint> tokenToEndPointMap)
     {
         logger_.debug("Constructing range to endpoint map ...");
         Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
@@ -641,7 +707,7 @@
     public Map<EndPoint, List<Range>> constructEndPointToRangesMap()
     {
         Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
         Collection<EndPoint> mbrs = tokenToEndPointMap.values();
         for ( EndPoint mbr : mbrs )
         {
@@ -649,6 +715,77 @@
         }
         return endPointToRangesMap;
     }
+    
+    /**
+     * Get the estimated disk space of the target endpoint in its
+     * primary range.
+     * @param target whose primary range we are interested in.
+     * @return disk space of the target in the primary range.
+     */
+    private double getDiskSpaceForPrimaryRange(EndPoint target)
+    {
+        double primaryDiskSpace = 0d;
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+        Range[] allRanges = getAllRanges(tokens);
+        Arrays.sort(allRanges);
+        /* Mapping from Range to its ordered position on the ring */
+        Map<Range, Integer> rangeIndex = new HashMap<Range, Integer>();
+        for ( int i = 0; i < allRanges.length; ++i )
+        {
+            rangeIndex.put(allRanges[i], i);
+        }
+        /* Get the coefficients for the equations */
+        List<double[]> equations = new ArrayList<double[]>();
+        /* Get the endpoint to range map */
+        Map<EndPoint, List<Range>> endPointToRangesMap = constructEndPointToRangesMap();
+        Set<EndPoint> eps = endPointToRangesMap.keySet();
+        
+        for ( EndPoint ep : eps )
+        {
+            List<Range> ranges = endPointToRangesMap.get(ep);
+            double[] equation = new double[allRanges.length];
+            for ( Range range : ranges )
+            {                
+                int index = rangeIndex.get(range);
+                equation[index] = 1;
+            }
+            equations.add(equation);
+        }
+        double[][] coefficients = equations.toArray( new double[0][0] );
+        
+        /* Get the constants which are the aggregate disk space for each endpoint */
+        double[] constants = new double[allRanges.length];
+        int index = 0;
+        for ( EndPoint ep : eps )
+        {
+            /* reset the port back to control port */
+            ep.setPort(DatabaseDescriptor.getControlPort());
+            String lInfo = null;
+            if ( ep.equals(StorageService.udpAddr_) )
+                lInfo = getLoadInfo();
+            else                
+                lInfo = getLoadInfo(ep);
+            LoadInfo li = new LoadInfo(lInfo);
+            constants[index++] = FileUtils.stringToFileSize(li.diskSpace());
+        }
+        
+        RealMatrix matrix = new RealMatrixImpl(coefficients);
+        double[] solutions = matrix.solve(constants);
+        Range primaryRange = getPrimaryRangeForEndPoint(target);
+        primaryDiskSpace = solutions[rangeIndex.get(primaryRange)];
+        return primaryDiskSpace;
+    }
+    
+    /**
+     * This is very dangerous. This is used only on the client
+     * side to set up the client library. This is then used to
+     * find the appropriate nodes to route the key to.
+    */
+    public void setTokenMetadata(TokenMetadata tokenMetadata)
+    {
+        tokenMetadata_ = tokenMetadata;
+    }
 
     /**
      *  Called when there is a change in application state. In particular
@@ -662,9 +799,9 @@
         ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
         if (nodeIdState != null)
         {
-            Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
+            BigInteger newToken = new BigInteger(nodeIdState.getState());
             logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
-            Token oldToken = tokenMetadata_.getToken(ep);
+            BigInteger oldToken = tokenMetadata_.getToken(ep);
 
             if ( oldToken != null )
             {
@@ -722,6 +859,17 @@
         }
     }
 
+    public static BigInteger generateRandomToken()
+    {
+	    byte[] randomBytes = new byte[24];
+	    Random random = new Random();
+	    for ( int i = 0 ; i < 24 ; i++)
+	    {
+	    randomBytes[i] = (byte)(31 + random.nextInt(256 - 31));
+	    }
+	    return hash(new String(randomBytes));
+    }
+
     /**
      * Get the count of primary keys from the sampler.
     */
@@ -741,12 +889,43 @@
         LoadInfo li = storageLoadBalancer_.getLoad(ep);
         return ( li == null ) ? "N/A" : li.toString();
     }
+    
+    /**
+     * Get the endpoint that has the largest primary count.
+     * @return
+     */
+    EndPoint getEndPointWithLargestPrimaryCount()
+    {
+        Set<EndPoint> allMbrs = Gossiper.instance().getAllMembers();
+        Map<LoadInfo, EndPoint> loadInfoToEndPointMap = new HashMap<LoadInfo, EndPoint>();
+        List<LoadInfo> lInfos = new ArrayList<LoadInfo>();
+        
+        for ( EndPoint mbr : allMbrs )
+        {
+            mbr.setPort(DatabaseDescriptor.getStoragePort());
+            LoadInfo li = null;
+            if ( mbr.equals(StorageService.tcpAddr_) )
+            {
+                li = new LoadInfo( getLoadInfo() );
+                lInfos.add( li );
+            }
+            else
+            {
+                li = storageLoadBalancer_.getLoad(mbr);
+                lInfos.add( li );
+            }
+            loadInfoToEndPointMap.put(li, mbr);
+        }
+        
+        Collections.sort(lInfos, new LoadInfo.DiskSpaceComparator());
+        return loadInfoToEndPointMap.get( lInfos.get(lInfos.size() - 1) );
+    }
 
     /*
      * This method updates the token on disk and modifies the cached
      * StorageMetadata instance. This is only for the local endpoint.
     */
-    public void updateToken(Token token) throws IOException
+    public void updateToken(BigInteger token) throws IOException
     {
         /* update the token on disk */
         SystemTable.openSystemTable(SystemTable.name_).updateToken(token);
@@ -787,12 +966,13 @@
     {
     	if ( keys.length > 0 )
     	{
-            Token token = tokenMetadata_.getToken(StorageService.tcpAddr_);
-	        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-	        Token[] tokens = tokenToEndPointMap.keySet().toArray(new Token[tokenToEndPointMap.keySet().size()]);
+	        isLoadState_ = true;
+	        BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
+	        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+	        BigInteger[] tokens = tokenToEndPointMap.keySet().toArray( new BigInteger[0] );
 	        Arrays.sort(tokens);
 	        int index = Arrays.binarySearch(tokens, token) * (keys.length/tokens.length);
-	        Token newToken = token( keys[index] );
+	        BigInteger newToken = hash( keys[index] );
 	        /* update the token */
 	        updateToken(newToken);
     	}
@@ -804,6 +984,7 @@
     */
     public void resetLoadState()
     {
+        isLoadState_ = false;
     }
     
     /**
@@ -830,7 +1011,7 @@
         }        
         String[] allNodes = nodesToLoad.split(":");
         EndPoint[] endpoints = new EndPoint[allNodes.length];
-        Token[] tokens = new Token[allNodes.length];
+        BigInteger[] tokens = new BigInteger[allNodes.length];
         
         for ( int i = 0; i < allNodes.length; ++i )
         {
@@ -866,8 +1047,8 @@
         switch ( mode )
         {
             case FULL:
-                Token token = tokenMetadata_.getToken(endpoint);
-                bootStrapper_.submit(new BootStrapper(new EndPoint[]{endpoint}, token));
+                BigInteger token = tokenMetadata_.getToken(endpoint);
+                bootStrapper_.submit( new BootStrapper(new EndPoint[]{endpoint}, new BigInteger[]{token}) );
                 break;
 
             case HINT:
@@ -885,14 +1066,26 @@
     public String getToken(EndPoint ep)
     {
         EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getStoragePort());
-        Token token = tokenMetadata_.getToken(ep2);
-        return ( token == null ) ? "" : token.toString();
+        BigInteger token = tokenMetadata_.getToken(ep2);
+        return ( token == null ) ? BigInteger.ZERO.toString() : token.toString();
     }
 
     public String getToken()
     {
         return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
     }
+    
+    public void updateToken(String token)
+    {
+        try
+        {
+            updateToken(new BigInteger(token));
+        }
+        catch ( IOException ex )
+        {
+            logger_.debug(LogUtil.throwableToString(ex));
+        }
+    }
 
     public String getLiveNodes()
     {
@@ -921,6 +1114,12 @@
         doBootstrap(nodes);
     }
     
+    public String getAppropriateToken(int count)
+    {
+        BigInteger token = BootstrapAndLbHelper.getTokenBasedOnPrimaryCount(count);
+        return token.toString();
+    }
+    
     public void doGC()
     {
         List<String> tables = DatabaseDescriptor.getTables();
@@ -975,9 +1174,9 @@
      */
     EndPoint getPredecessor(EndPoint ep)
     {
-        Token token = tokenMetadata_.getToken(ep);
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        BigInteger token = tokenMetadata_.getToken(ep);
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         EndPoint predecessor = (index == 0) ? tokenToEndPointMap.get(tokens
@@ -992,9 +1191,9 @@
      */
     public EndPoint getSuccessor(EndPoint ep)
     {
-        Token token = tokenMetadata_.getToken(ep);
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        BigInteger token = tokenMetadata_.getToken(ep);
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         EndPoint successor = (index == (tokens.size() - 1)) ? tokenToEndPointMap
@@ -1004,15 +1203,44 @@
     }
 
     /**
+     * This method returns the range handled by this node.
+     */
+    public Range getMyRange()
+    {
+        BigInteger myToken = tokenMetadata_.getToken(StorageService.tcpAddr_);
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> allTokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Collections.sort(allTokens);
+        int index = Collections.binarySearch(allTokens, myToken);
+        /* Calculate the lhs for the range */
+        BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
+        return new Range( lhs, myToken );
+    }
+    
+    /**
+     * Get the primary for the given range. Use the replica placement
+     * strategies to determine which are the replicas. The first replica
+     * in the list is the primary.
+     * 
+     * @param range on the ring.
+     * @return endpoint responsible for the range.
+     */
+    public EndPoint getPrimaryStorageEndPointForRange(Range range)
+    {
+        EndPoint[] replicas = nodePicker_.getStorageEndPoints(range.left());
+        return replicas[0];
+    }
+    
+    /**
      * Get the primary range for the specified endpoint.
      * @param ep endpoint we are interested in.
      * @return range for the specified endpoint.
      */
     public Range getPrimaryRangeForEndPoint(EndPoint ep)
     {
-        Token right = tokenMetadata_.getToken(ep);
+        BigInteger right = tokenMetadata_.getToken(ep);
         EndPoint predecessor = getPredecessor(ep);
-        Token left = tokenMetadata_.getToken(predecessor);
+        BigInteger left = tokenMetadata_.getToken(predecessor);
         return new Range(left, right);
     }
     
@@ -1036,17 +1264,28 @@
         
         return ranges;
     }
-
+    
+    /**
+     * Get all ranges that span the ring as per
+     * current snapshot of the token distribution.
+     * @return all ranges in sorted order.
+     */
+    public Range[] getAllRanges()
+    {
+        Set<BigInteger> allTokens = tokenMetadata_.cloneTokenEndPointMap().keySet();
+        return getAllRanges( allTokens );
+    }
+    
     /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of 
      * ranges.
      * @return ranges in sorted order
     */
-    public Range[] getAllRanges(Set<Token> tokens)
+    public Range[] getAllRanges(Set<BigInteger> tokens)
     {
         List<Range> ranges = new ArrayList<Range>();
-        List<Token> allTokens = new ArrayList<Token>(tokens);
+        List<BigInteger> allTokens = new ArrayList<BigInteger>(tokens);
         Collections.sort(allTokens);
         int size = allTokens.size();
         for ( int i = 1; i < size; ++i )
@@ -1060,6 +1299,20 @@
     }
 
     /**
+     * Get all ranges that span the ring given a set
+     * of endpoints.
+    */
+    public Range[] getPrimaryRangesForEndPoints(Set<EndPoint> endpoints)
+    {
+        List<Range> allRanges = new ArrayList<Range>();
+        for ( EndPoint endpoint : endpoints )
+        {
+            allRanges.add( getPrimaryRangeForEndPoint( endpoint) );
+        }
+        return allRanges.toArray(new Range[0]);
+    }
+    
+    /**
      * This method returns the endpoint that is responsible for storing the
      * specified key.
      *
@@ -1069,9 +1322,9 @@
     public EndPoint getPrimary(String key)
     {
         EndPoint endpoint = StorageService.tcpAddr_;
-        Token token = token(key);
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        BigInteger token = hash(key);
+        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
         if (tokens.size() > 0)
         {
             Collections.sort(tokens);
@@ -1107,7 +1360,63 @@
         EndPoint endpoint = getPrimary(key);
         return StorageService.tcpAddr_.equals(endpoint);
     }
-
+    
+    /**
+     * This method determines whether the target endpoint is the
+     * primary for the given key.
+     * @param key
+     * @param target the target enpoint 
+     * @return true if the local endpoint is the primary replica.
+    */
+    public boolean isPrimary(String key, EndPoint target)
+    {
+        EndPoint endpoint = getPrimary(key);
+        return target.equals(endpoint);
+    }
+    
+    /**
+     * This method determines whether the local endpoint is the
+     * seondary replica for the given key.
+     * @param key
+     * @return true if the local endpoint is the secondary replica.
+     */
+    public boolean isSecondary(String key)
+    {
+        EndPoint[] topN = getNStorageEndPoint(key);
+        if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
+            return false;
+        return topN[1].equals(StorageService.tcpAddr_);
+    }
+    
+    /**
+     * This method determines whether the local endpoint is the
+     * seondary replica for the given key.
+     * @param key
+     * @return true if the local endpoint is the tertiary replica.
+     */
+    public boolean isTertiary(String key)
+    {
+        EndPoint[] topN = getNStorageEndPoint(key);
+        if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
+            return false;
+        return topN[2].equals(StorageService.tcpAddr_);
+    }
+    
+    /**
+     * This method determines if the local endpoint is
+     * in the topN of N nodes passed in.
+    */
+    public boolean isInTopN(String key)
+    {
+    	EndPoint[] topN = getNStorageEndPoint(key);
+        for ( EndPoint ep : topN )
+        {
+            if ( ep.equals( StorageService.tcpAddr_ ) )
+                return true;
+        }
+        return false;
+    }
+    
     /**
      * This method returns the N endpoints that are responsible for storing the
      * specified key i.e for replication.
@@ -1117,11 +1426,16 @@
      */
     public EndPoint[] getNStorageEndPoint(String key)
     {
-        Token token = token(key);
+        BigInteger token = hash(key);
         return nodePicker_.getStorageEndPoints(token);
     }
-
-
+    
+    private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
+    {
+    	return nodePicker_.getStorageEndPoints(keys);
+    }
+    
+    
     /**
      * This method attempts to return N endpoints that are responsible for storing the
      * specified key i.e for replication.
@@ -1152,7 +1466,22 @@
      */
     public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
     {
-        Token token = token(key);
+        BigInteger token = hash(key);
+        return nodePicker_.getHintedStorageEndPoints(token);
+    }
+
+    /**
+     * This method returns the N endpoints that are responsible for storing the
+     * specified key i.e for replication. But it makes sure that the N endpoints
+     * that are returned are live as reported by the FD. It returns the hint information
+     * if some nodes in the top N are not live.
+     *
+     * param @ key - key for which we need to find the endpoint return value -
+     * the endpoint responsible for this key
+     */
+    public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(String key)
+    {
+        BigInteger token = hash(key);
         return nodePicker_.getHintedStorageEndPoints(token);
     }
 
@@ -1162,7 +1491,7 @@
      *
      * param @ token - position on the ring
      */
-    public EndPoint[] getNStorageEndPoint(Token token)
+    public EndPoint[] getNStorageEndPoint(BigInteger token)
     {
         return nodePicker_.getStorageEndPoints(token);
     }
@@ -1175,12 +1504,25 @@
      * param @ token - position on the ring
      * param @ tokens - w/o the following tokens in the token list
      */
-    protected EndPoint[] getNStorageEndPoint(Token token, Map<Token, EndPoint> tokenToEndPointMap)
+    protected EndPoint[] getNStorageEndPoint(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
     {
         return nodePicker_.getStorageEndPoints(token, tokenToEndPointMap);
     }
 
     /**
+     * This method returns the N endpoints that are responsible for storing the
+     * specified key i.e for replication. But it makes sure that the N endpoints
+     * that are returned are live as reported by the FD. It returns the hint information
+     * if some nodes in the top N are not live.
+     *
+     * param @ token - position on the ring
+     */
+    public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(BigInteger token)
+    {
+        return nodePicker_.getHintedStorageEndPoints(token);
+    }
+    
+    /**
      * This function finds the most suitable endpoint given a key.
      * It checks for loclity and alive test.
      */
@@ -1217,5 +1559,58 @@
 		}
 		return null;
 	}
-
+	
+	public Map<String, EndPoint> findSuitableEndPoints(String[] keys) throws IOException
+	{
+		Map<String, EndPoint> suitableEndPoints = new HashMap<String, EndPoint>();
+		Map<String, EndPoint[]> results = getNStorageEndPoints(keys);
+		for ( String key : keys )
+		{
+			EndPoint[] endpoints = results.get(key);
+			/* indicates if we have to move on to the next key */
+			boolean moveOn = false;
+			for(EndPoint endPoint: endpoints)
+			{
+				if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+				{
+					suitableEndPoints.put(key, endPoint);
+					moveOn = true;
+					break;
+				}
+			}
+			
+			if ( moveOn )
+				continue;
+				
+			int j = 0;
+			for ( ; j < endpoints.length; ++j )
+			{
+				if ( StorageService.instance().isInSameDataCenter(endpoints[j]) && FailureDetector.instance().isAlive(endpoints[j]) )
+				{
+					logger_.debug("EndPoint " + endpoints[j] + " is in the same data center as local storage endpoint.");
+					suitableEndPoints.put(key, endpoints[j]);
+					moveOn = true;
+					break;
+				}
+			}
+			
+			if ( moveOn )
+				continue;
+			
+			// We have tried to be really nice but looks like theer are no servers 
+			// in the local data center that are alive and can service this request so 
+			// just send it to the first alive guy and see if we get anything.
+			j = 0;
+			for ( ; j < endpoints.length; ++j )
+			{
+				if ( FailureDetector.instance().isAlive(endpoints[j]) )
+				{
+					logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
+					suitableEndPoints.put(key, endpoints[j]);
+					break;
+				}
+			}
+		}
+		return suitableEndPoints;
+	}
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java Fri Mar 27 05:41:59 2009
@@ -19,6 +19,9 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.cassandra.net.EndPoint;
 
 
 /**
@@ -44,10 +47,24 @@
     public void loadAll(String nodes);
     
     /**
+     * This method is used only for debug purpose.  
+    */
+    public void updateToken(String token);    
+    
+    /**
      * 
      */
     public void doGC();
-
+    
+    /**
+     * Get the token such that the range of this node
+     * is split after <i>count</i> number of keys.
+     * @param count number of keys after which to generate
+     *              token.
+     * @return appropriate token
+     */
+    public String getAppropriateToken(int count);
+    
     /**
      * Stream the files in the bootstrap directory over to the
      * node being bootstrapped. This is used in case of normal

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java Fri Mar 27 05:41:59 2009
@@ -19,13 +19,12 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
+import java.math.BigInteger;
 
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -38,7 +37,7 @@
     public void doVerb(Message message)
     {
     	byte[] body = (byte[])message.getMessageBody()[0];
-        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
+        BigInteger token = new BigInteger(body);
         try
         {
         	logger_.info("Updating the token to [" + token + "]");

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java Fri Mar 27 05:41:59 2009
@@ -20,7 +20,7 @@
 
 import java.util.List;
 
-import org.apache.cassandra.db.WriteResponse;
+import org.apache.cassandra.db.WriteResponseMessage;
 import org.apache.cassandra.net.Message;
 import org.apache.log4j.Logger;
 
@@ -47,11 +47,11 @@
 		boolean returnValue = false;
 		for (Message response : responses) {
 			Object[] body = response.getMessageBody();
-			WriteResponse writeResponse = (WriteResponse) body[0];
-			boolean result = writeResponse.isSuccess();
+			WriteResponseMessage writeResponseMessage = (WriteResponseMessage) body[0];
+			boolean result = writeResponseMessage.isSuccess();
 			if (!result) {
 				logger_.debug("Write at " + response.getFrom()
-						+ " may have failed for the key " + writeResponse.key());
+						+ " may have failed for the key " + writeResponseMessage.key());
 			}
 			returnValue |= result;
 		}