You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:29 UTC

svn commit: r758998 [2/2] - in /incubator/cassandra/trunk/src/org/apache/cassandra: db/ dht/ io/ locator/ service/

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:28 2009
@@ -28,7 +28,6 @@
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.math.BigInteger;
-import java.net.UnknownHostException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -62,15 +61,12 @@
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.locator.EndPointSnitch;
 import org.apache.cassandra.locator.IEndPointSnitch;
 import org.apache.cassandra.locator.IReplicaPlacementStrategy;
 import org.apache.cassandra.locator.RackAwareStrategy;
 import org.apache.cassandra.locator.RackUnawareStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -84,11 +80,7 @@
 import org.apache.commons.math.linear.RealMatrix;
 import org.apache.commons.math.linear.RealMatrixImpl;
 import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.*;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.net.io.*;
-import org.apache.cassandra.gms.*;
-import org.apache.cassandra.utils.*;
+
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -96,7 +88,6 @@
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.WatcherEvent;
 
 /*
  * This abstraction contains the token/identifier of this node
@@ -133,11 +124,6 @@
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
     public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
     public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
-    public final static String jobConfigurationVerbHandler_ = "JOB-CONFIGURATION-VERB-HANDLER";
-    public final static String taskMetricVerbHandler_ = "TASK-METRIC-VERB-HANDLER";
-    public final static String mapAssignmentVerbHandler_ = "MAP-ASSIGNMENT-VERB-HANDLER"; 
-    public final static String reduceAssignmentVerbHandler_ = "REDUCE-ASSIGNMENT-VERB-HANDLER";
-    public final static String mapCompletionVerbHandler_ = "MAP-COMPLETION-VERB-HANDLER";
     public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
     public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
     
@@ -168,11 +154,6 @@
     {
         return "http://" + tcpAddr_.getHost() + ":" + DatabaseDescriptor.getHttpPort();
     }
-    
-    public static PartitionerType getPartitionerType()
-    {
-        return (DatabaseDescriptor.ophf_.equalsIgnoreCase(DatabaseDescriptor.getHashingStrategy())) ? PartitionerType.OPHF : PartitionerType.RANDOM;
-    }
 
     /**
      * This is a facade for the hashing 
@@ -253,8 +234,6 @@
      *
      */
     private IEndPointSnitch endPointSnitch_;
-    /* Uptime of this node - we use this to determine if a bootstrap can be performed by this node */
-    private long uptime_ = 0L;
 
     /* This abstraction maintains the token/endpoint metadata information */
     private TokenMetadata tokenMetadata_ = new TokenMetadata();
@@ -265,11 +244,6 @@
      * for a clean exit.
     */
     private Set<IComponentShutdown> components_ = new HashSet<IComponentShutdown>();
-    /*
-     * This boolean indicates if we are in loading state. If we are then we do not want any
-     * distributed algorithms w.r.t change in token state to kick in.
-    */
-    private boolean isLoadState_ = false;
     /* Timer is used to disseminate load information */
     private Timer loadTimer_ = new Timer(false);
 
@@ -316,7 +290,6 @@
     public StorageService() throws Throwable
     {
         init();
-        uptime_ = System.currentTimeMillis();        
         storageLoadBalancer_ = new StorageLoadBalancer(this);
         endPointSnitch_ = new EndPointSnitch();
         
@@ -331,8 +304,6 @@
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapInitiateDoneVerbHandler_, new StorageService.BootstrapInitiateDoneVerbHandler());
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bootStrapTerminateVerbHandler_, new StreamManager.BootstrapTerminateVerbHandler());
         MessagingService.getMessagingInstance().registerVerbHandlers(HttpConnection.httpRequestVerbHandler_, new HttpRequestVerbHandler(this) );
-        MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.tokenInfoVerbHandler_, new TokenInfoVerbHandler() );
-        MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.locationInfoVerbHandler_, new LocationInfoVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.dataFileVerbHandler_, new DataFileVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
         MessagingService.getMessagingInstance().registerVerbHandlers(StorageService.bsMetadataVerbHandler_, new BootstrapMetadataVerbHandler() );        
@@ -597,42 +568,6 @@
 		consistencyManager_.submit(consistencySentinel);
     }
 
-    /*
-     * This method displays all the ranges and the replicas
-     * that are responsible for the individual ranges. The
-     * format of this string is the following:
-     *
-     *  R1 : A B C
-     *  R2 : D E F
-     *  R3 : G H I
-    */
-    public String showTheRing()
-    {
-        StringBuilder sb = new StringBuilder();
-        /* Get the token to endpoint map. */
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
-        /* All the ranges for the tokens */
-        Range[] ranges = getAllRanges(tokens);
-        Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
-
-        Set<Range> rangeSet = oldRangeToEndPointMap.keySet();
-        for ( Range range : rangeSet )
-        {
-            sb.append(range);
-            sb.append(" : ");
-
-            List<EndPoint> replicas = oldRangeToEndPointMap.get(range);
-            for ( EndPoint replica : replicas )
-            {
-                sb.append(replica);
-                sb.append(" ");
-            }
-            sb.append(System.getProperty("line.separator"));
-        }
-        return sb.toString();
-    }
-
     public Map<Range, List<EndPoint>> getRangeToEndPointMap()
     {
         /* Get the token to endpoint map. */
@@ -700,77 +635,6 @@
         }
         return endPointToRangesMap;
     }
-    
-    /**
-     * Get the estimated disk space of the target endpoint in its
-     * primary range.
-     * @param target whose primary range we are interested in.
-     * @return disk space of the target in the primary range.
-     */
-    private double getDiskSpaceForPrimaryRange(EndPoint target)
-    {
-        double primaryDiskSpace = 0d;
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
-        Range[] allRanges = getAllRanges(tokens);
-        Arrays.sort(allRanges);
-        /* Mapping from Range to its ordered position on the ring */
-        Map<Range, Integer> rangeIndex = new HashMap<Range, Integer>();
-        for ( int i = 0; i < allRanges.length; ++i )
-        {
-            rangeIndex.put(allRanges[i], i);
-        }
-        /* Get the coefficients for the equations */
-        List<double[]> equations = new ArrayList<double[]>();
-        /* Get the endpoint to range map */
-        Map<EndPoint, List<Range>> endPointToRangesMap = constructEndPointToRangesMap();
-        Set<EndPoint> eps = endPointToRangesMap.keySet();
-        
-        for ( EndPoint ep : eps )
-        {
-            List<Range> ranges = endPointToRangesMap.get(ep);
-            double[] equation = new double[allRanges.length];
-            for ( Range range : ranges )
-            {                
-                int index = rangeIndex.get(range);
-                equation[index] = 1;
-            }
-            equations.add(equation);
-        }
-        double[][] coefficients = equations.toArray( new double[0][0] );
-        
-        /* Get the constants which are the aggregate disk space for each endpoint */
-        double[] constants = new double[allRanges.length];
-        int index = 0;
-        for ( EndPoint ep : eps )
-        {
-            /* reset the port back to control port */
-            ep.setPort(DatabaseDescriptor.getControlPort());
-            String lInfo = null;
-            if ( ep.equals(StorageService.udpAddr_) )
-                lInfo = getLoadInfo();
-            else                
-                lInfo = getLoadInfo(ep);
-            LoadInfo li = new LoadInfo(lInfo);
-            constants[index++] = FileUtils.stringToFileSize(li.diskSpace());
-        }
-        
-        RealMatrix matrix = new RealMatrixImpl(coefficients);
-        double[] solutions = matrix.solve(constants);
-        Range primaryRange = getPrimaryRangeForEndPoint(target);
-        primaryDiskSpace = solutions[rangeIndex.get(primaryRange)];
-        return primaryDiskSpace;
-    }
-    
-    /**
-     * This is very dangerous. This is used only on the client
-     * side to set up the client library. This is then used to
-     * find the appropriate nodes to route the key to.
-    */
-    public void setTokenMetadata(TokenMetadata tokenMetadata)
-    {
-        tokenMetadata_ = tokenMetadata;
-    }
 
     /**
      *  Called when there is a change in application state. In particular
@@ -844,17 +708,6 @@
         }
     }
 
-    public static BigInteger generateRandomToken()
-    {
-	    byte[] randomBytes = new byte[24];
-	    Random random = new Random();
-	    for ( int i = 0 ; i < 24 ; i++)
-	    {
-	    randomBytes[i] = (byte)(31 + random.nextInt(256 - 31));
-	    }
-	    return hash(new String(randomBytes));
-    }
-
     /**
      * Get the count of primary keys from the sampler.
     */
@@ -874,37 +727,6 @@
         LoadInfo li = storageLoadBalancer_.getLoad(ep);
         return ( li == null ) ? "N/A" : li.toString();
     }
-    
-    /**
-     * Get the endpoint that has the largest primary count.
-     * @return
-     */
-    EndPoint getEndPointWithLargestPrimaryCount()
-    {
-        Set<EndPoint> allMbrs = Gossiper.instance().getAllMembers();
-        Map<LoadInfo, EndPoint> loadInfoToEndPointMap = new HashMap<LoadInfo, EndPoint>();
-        List<LoadInfo> lInfos = new ArrayList<LoadInfo>();
-        
-        for ( EndPoint mbr : allMbrs )
-        {
-            mbr.setPort(DatabaseDescriptor.getStoragePort());
-            LoadInfo li = null;
-            if ( mbr.equals(StorageService.tcpAddr_) )
-            {
-                li = new LoadInfo( getLoadInfo() );
-                lInfos.add( li );
-            }
-            else
-            {
-                li = storageLoadBalancer_.getLoad(mbr);
-                lInfos.add( li );
-            }
-            loadInfoToEndPointMap.put(li, mbr);
-        }
-        
-        Collections.sort(lInfos, new LoadInfo.DiskSpaceComparator());
-        return loadInfoToEndPointMap.get( lInfos.get(lInfos.size() - 1) );
-    }
 
     /*
      * This method updates the token on disk and modifies the cached
@@ -951,8 +773,7 @@
     {
     	if ( keys.length > 0 )
     	{
-	        isLoadState_ = true;
-	        BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
+            BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
 	        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
 	        BigInteger[] tokens = tokenToEndPointMap.keySet().toArray( new BigInteger[0] );
 	        Arrays.sort(tokens);
@@ -969,7 +790,6 @@
     */
     public void resetLoadState()
     {
-        isLoadState_ = false;
     }
     
     /**
@@ -1182,35 +1002,6 @@
     }
 
     /**
-     * This method returns the range handled by this node.
-     */
-    public Range getMyRange()
-    {
-        BigInteger myToken = tokenMetadata_.getToken(StorageService.tcpAddr_);
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List<BigInteger> allTokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
-        Collections.sort(allTokens);
-        int index = Collections.binarySearch(allTokens, myToken);
-        /* Calculate the lhs for the range */
-        BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
-        return new Range( lhs, myToken );
-    }
-    
-    /**
-     * Get the primary for the given range. Use the replica placement
-     * strategies to determine which are the replicas. The first replica
-     * in the list is the primary.
-     * 
-     * @param range on the ring.
-     * @return endpoint responsible for the range.
-     */
-    public EndPoint getPrimaryStorageEndPointForRange(Range range)
-    {
-        EndPoint[] replicas = nodePicker_.getStorageEndPoints(range.left());
-        return replicas[0];
-    }
-    
-    /**
      * Get the primary range for the specified endpoint.
      * @param ep endpoint we are interested in.
      * @return range for the specified endpoint.
@@ -1243,18 +1034,7 @@
         
         return ranges;
     }
-    
-    /**
-     * Get all ranges that span the ring as per
-     * current snapshot of the token distribution.
-     * @return all ranges in sorted order.
-     */
-    public Range[] getAllRanges()
-    {
-        Set<BigInteger> allTokens = tokenMetadata_.cloneTokenEndPointMap().keySet();
-        return getAllRanges( allTokens );
-    }
-    
+
     /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of 
@@ -1278,20 +1058,6 @@
     }
 
     /**
-     * Get all ranges that span the ring given a set
-     * of endpoints.
-    */
-    public Range[] getPrimaryRangesForEndPoints(Set<EndPoint> endpoints)
-    {
-        List<Range> allRanges = new ArrayList<Range>();
-        for ( EndPoint endpoint : endpoints )
-        {
-            allRanges.add( getPrimaryRangeForEndPoint( endpoint) );
-        }
-        return allRanges.toArray(new Range[0]);
-    }
-    
-    /**
      * This method returns the endpoint that is responsible for storing the
      * specified key.
      *
@@ -1339,63 +1105,7 @@
         EndPoint endpoint = getPrimary(key);
         return StorageService.tcpAddr_.equals(endpoint);
     }
-    
-    /**
-     * This method determines whether the target endpoint is the
-     * primary for the given key.
-     * @param key
-     * @param target the target enpoint 
-     * @return true if the local endpoint is the primary replica.
-    */
-    public boolean isPrimary(String key, EndPoint target)
-    {
-        EndPoint endpoint = getPrimary(key);
-        return target.equals(endpoint);
-    }
-    
-    /**
-     * This method determines whether the local endpoint is the
-     * seondary replica for the given key.
-     * @param key
-     * @return true if the local endpoint is the secondary replica.
-     */
-    public boolean isSecondary(String key)
-    {
-        EndPoint[] topN = getNStorageEndPoint(key);
-        if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
-            return false;
-        return topN[1].equals(StorageService.tcpAddr_);
-    }
-    
-    /**
-     * This method determines whether the local endpoint is the
-     * seondary replica for the given key.
-     * @param key
-     * @return true if the local endpoint is the tertiary replica.
-     */
-    public boolean isTertiary(String key)
-    {
-        EndPoint[] topN = getNStorageEndPoint(key);
-        if ( topN.length < DatabaseDescriptor.getReplicationFactor() )
-            return false;
-        return topN[2].equals(StorageService.tcpAddr_);
-    }
-    
-    /**
-     * This method determines if the local endpoint is
-     * in the topN of N nodes passed in.
-    */
-    public boolean isInTopN(String key)
-    {
-    	EndPoint[] topN = getNStorageEndPoint(key);
-        for ( EndPoint ep : topN )
-        {
-            if ( ep.equals( StorageService.tcpAddr_ ) )
-                return true;
-        }
-        return false;
-    }
-    
+
     /**
      * This method returns the N endpoints that are responsible for storing the
      * specified key i.e for replication.
@@ -1408,13 +1118,8 @@
         BigInteger token = hash(key);
         return nodePicker_.getStorageEndPoints(token);
     }
-    
-    private Map<String, EndPoint[]> getNStorageEndPoints(String[] keys)
-    {
-    	return nodePicker_.getStorageEndPoints(keys);
-    }
-    
-    
+
+
     /**
      * This method attempts to return N endpoints that are responsible for storing the
      * specified key i.e for replication.
@@ -1451,21 +1156,6 @@
 
     /**
      * This method returns the N endpoints that are responsible for storing the
-     * specified key i.e for replication. But it makes sure that the N endpoints
-     * that are returned are live as reported by the FD. It returns the hint information
-     * if some nodes in the top N are not live.
-     *
-     * param @ key - key for which we need to find the endpoint return value -
-     * the endpoint responsible for this key
-     */
-    public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(String key)
-    {
-        BigInteger token = hash(key);
-        return nodePicker_.getHintedStorageEndPoints(token);
-    }
-
-    /**
-     * This method returns the N endpoints that are responsible for storing the
      * specified token i.e for replication.
      *
      * param @ token - position on the ring
@@ -1489,19 +1179,6 @@
     }
 
     /**
-     * This method returns the N endpoints that are responsible for storing the
-     * specified key i.e for replication. But it makes sure that the N endpoints
-     * that are returned are live as reported by the FD. It returns the hint information
-     * if some nodes in the top N are not live.
-     *
-     * param @ token - position on the ring
-     */
-    public Map<EndPoint, EndPoint> getNHintedStorageEndPoint(BigInteger token)
-    {
-        return nodePicker_.getHintedStorageEndPoints(token);
-    }
-    
-    /**
      * This function finds the most suitable endpoint given a key.
      * It checks for loclity and alive test.
      */
@@ -1538,58 +1215,5 @@
 		}
 		return null;
 	}
-	
-	public Map<String, EndPoint> findSuitableEndPoints(String[] keys) throws IOException
-	{
-		Map<String, EndPoint> suitableEndPoints = new HashMap<String, EndPoint>();
-		Map<String, EndPoint[]> results = getNStorageEndPoints(keys);
-		for ( String key : keys )
-		{
-			EndPoint[] endpoints = results.get(key);
-			/* indicates if we have to move on to the next key */
-			boolean moveOn = false;
-			for(EndPoint endPoint: endpoints)
-			{
-				if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
-				{
-					suitableEndPoints.put(key, endPoint);
-					moveOn = true;
-					break;
-				}
-			}
-			
-			if ( moveOn )
-				continue;
-				
-			int j = 0;
-			for ( ; j < endpoints.length; ++j )
-			{
-				if ( StorageService.instance().isInSameDataCenter(endpoints[j]) && FailureDetector.instance().isAlive(endpoints[j]) )
-				{
-					logger_.debug("EndPoint " + endpoints[j] + " is in the same data center as local storage endpoint.");
-					suitableEndPoints.put(key, endpoints[j]);
-					moveOn = true;
-					break;
-				}
-			}
-			
-			if ( moveOn )
-				continue;
-			
-			// We have tried to be really nice but looks like theer are no servers 
-			// in the local data center that are alive and can service this request so 
-			// just send it to the first alive guy and see if we get anything.
-			j = 0;
-			for ( ; j < endpoints.length; ++j )
-			{
-				if ( FailureDetector.instance().isAlive(endpoints[j]) )
-				{
-					logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
-					suitableEndPoints.put(key, endpoints[j]);
-					break;
-				}
-			}
-		}
-		return suitableEndPoints;
-	}
+
 }