You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/26 21:04:07 UTC

svn commit: r808161 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/locator/ test/unit/org/apache/cassandra/locator/

Author: jbellis
Date: Wed Aug 26 19:04:07 2009
New Revision: 808161

URL: http://svn.apache.org/viewvc?rev=808161&view=rev
Log:
1. Switch bootstrapNodes in TokenMetadata to Map<Token,EndPoint> so RackUnawareStrategy can use it.
2. Fix AbstractStrategy and RackUnawareStrategy to incorporate the bootstrapping nodes for
getHintedStorageEndPoints through StorageService.getNStorageEndPointMap (now used by insert and
insertBlocking after 383)
3. Add unit test fot RackUnawareStrategy to test if bootstrapping nodes are being returned correctly.

patch by Sandeep Tata; reviewed by jbellis for CASSANDRA-375

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractStrategy.java Wed Aug 26 19:04:07 2009
@@ -102,9 +102,14 @@
      */
     public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
     {
+        EndPoint[] topN = getStorageEndPointsForWrite( token );
+        return getHintedMapForEndpoints(topN);
+    }
+    
+    private Map<EndPoint, EndPoint> getHintedMapForEndpoints(EndPoint[] topN)
+    {
         List<EndPoint> liveList = new ArrayList<EndPoint>();
         Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
-        EndPoint[] topN = getStorageEndPoints( token );
 
         for( int i = 0 ; i < topN.length ; i++)
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Wed Aug 26 19:04:07 2009
@@ -33,6 +33,7 @@
 public interface IReplicaPlacementStrategy
 {
 	public EndPoint[] getStorageEndPoints(Token token);
+	public EndPoint[] getStorageEndPointsForWrite(Token token);
     public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
     public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
     public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Wed Aug 26 19:04:07 2009
@@ -139,4 +139,15 @@
     {
         throw new UnsupportedOperationException("This operation is not currently supported");
     }
+
+    public EndPoint[] getStorageEndPointsForWrite(Token token)
+    {
+        throw new UnsupportedOperationException("Rack-aware bootstrapping not supported");
+    }
+
+    
+    public Map<EndPoint, EndPoint> getHintedStorageEndPointsForWrite(Token token)
+    {
+        throw new UnsupportedOperationException("Rack-aware bootstrapping not supported");
+    }
 }
\ No newline at end of file

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/RackUnawareStrategy.java Wed Aug 26 19:04:07 2009
@@ -46,12 +46,47 @@
         return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());            
     }
     
+    public EndPoint[] getStorageEndPointsForWrite(Token token)
+    {
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, EndPoint> bootstrapTokensToEndpointMap = tokenMetadata_.cloneBootstrapNodes();
+        List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, bootstrapTokensToEndpointMap);
+        List<EndPoint> list = new ArrayList<EndPoint>();
+        for (Token t: tokenList)
+        {
+            EndPoint e = tokenToEndPointMap.get(t);
+            if (e == null) 
+                e = bootstrapTokensToEndpointMap.get(t); 
+            assert e != null;
+            list.add(e);
+        }
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[list.size()]);            
+    }
+    
     public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
     {
-        int startIndex;
+        List<Token> tokenList = getStorageTokens(token, tokenToEndPointMap, null);
         List<EndPoint> list = new ArrayList<EndPoint>();
+        for (Token t: tokenList)
+            list.add(tokenToEndPointMap.get(t));
+        retrofitPorts(list);
+        return list.toArray(new EndPoint[list.size()]);
+    }
+
+    private List<Token> getStorageTokens(Token token, Map<Token, EndPoint> tokenToEndPointMap, Map<Token, EndPoint> bootStrapTokenToEndPointMap)
+    {
+        int startIndex;
+        List<Token> tokenList = new ArrayList<Token>();
         int foundCount = 0;
         List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
+        List<Token> bsTokens = null;
+        
+        if (bootStrapTokenToEndPointMap != null)
+        {
+            bsTokens = new ArrayList<Token>(bootStrapTokenToEndPointMap.keySet());
+            tokens.addAll(bsTokens);
+        }
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
@@ -61,22 +96,24 @@
                 index = 0;
         }
         int totalNodes = tokens.size();
-        // Add the node at the index by default
-        list.add(tokenToEndPointMap.get(tokens.get(index)));
-        foundCount++;
+        // Add the token at the index by default
+        tokenList.add((Token)tokens.get(index));
+        if (bsTokens == null || !bsTokens.contains(tokens.get(index)))
+            foundCount++;
         startIndex = (index + 1)%totalNodes;
         // If we found N number of nodes we are good. This loop will just exit. Otherwise just
         // loop through the list and add until we have N nodes.
         for (int i = startIndex, count = 1; count < totalNodes && foundCount < replicas_; ++count, i = (i+1)%totalNodes)
         {
-            if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
+            if(!tokenList.contains(tokens.get(i)))
             {
-                list.add(tokenToEndPointMap.get(tokens.get(i)));
-                foundCount++;
+                tokenList.add((Token)tokens.get(i));
+                //Don't count bootstrapping tokens towards the count
+                if (bsTokens==null || !bsTokens.contains(tokens.get(i)))
+                    foundCount++;
             }
         }
-        retrofitPorts(list);
-        return list.toArray(new EndPoint[list.size()]);
+        return tokenList;
     }
             
     public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
@@ -87,7 +124,6 @@
         {
             results.put(key, getStorageEndPoints(partitioner_.getToken(key)));
         }
-
         return results;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Wed Aug 26 19:04:07 2009
@@ -32,7 +32,7 @@
     /* Maintains a reverse index of endpoint to token in the cluster. */
     private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint, Token>();
     /* Bootstrapping nodes and their tokens */
-    private Map<EndPoint, Token> bootstrapNodes = Collections.synchronizedMap(new HashMap<EndPoint, Token>());
+    private Map<Token, EndPoint> bootstrapNodes = Collections.synchronizedMap(new HashMap<Token, EndPoint>());
     
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -40,19 +40,19 @@
     public TokenMetadata()
     {
     }
-    
-    public TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap, Map<EndPoint, Token> bootstrapNodes)
+
+    public TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap, Map<Token, EndPoint> bootstrapNodes)
     {
         tokenToEndPointMap_ = tokenToEndPointMap;
         endPointToTokenMap_ = endPointToTokenMap;
-        this.bootstrapNodes = bootstrapNodes;
+        this.bootstrapNodes = bootstrapNodes; 
     }
     
     public TokenMetadata cloneMe()
     {
         return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap(), cloneBootstrapNodes());
     }
-    
+        
     public void update(Token token, EndPoint endpoint)
     {
         this.update(token, endpoint, false);
@@ -67,12 +67,12 @@
         {
             if (bootstrapState)
             {
-                bootstrapNodes.put(endpoint, token);
+                bootstrapNodes.put(token, endpoint);
                 this.remove(endpoint);
             }
             else
             {
-                bootstrapNodes.remove(endpoint); // If this happened to be there 
+                bootstrapNodes.remove(token); // If this happened to be there 
                 Token oldToken = endPointToTokenMap_.get(endpoint);
                 if ( oldToken != null )
                     tokenToEndPointMap_.remove(oldToken);
@@ -168,12 +168,12 @@
         }
     }
     
-    public Map<EndPoint, Token> cloneBootstrapNodes()
+    public Map<Token, EndPoint> cloneBootstrapNodes()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<EndPoint, Token>( bootstrapNodes );
+            return new HashMap<Token, EndPoint>( bootstrapNodes );
         }
         finally
         {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=808161&r1=808160&r2=808161&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Wed Aug 26 19:04:07 2009
@@ -18,6 +18,9 @@
 */
 package org.apache.cassandra.locator;
 
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
 import java.util.List;
 import java.util.ArrayList;
 
@@ -79,11 +82,57 @@
         for (int i = 0; i < keyTokens.length; i++)
         {
             EndPoint[] endPoints = strategy.getStorageEndPoints(keyTokens[i]);
-            assert endPoints.length == 3;
+            assertEquals(3, endPoints.length);
             for (int j = 0; j < endPoints.length; j++)
             {
-                assert endPoints[j] == hosts.get((i + j + 1) % hosts.size());
+                assertEquals(endPoints[j], hosts.get((i + j + 1) % hosts.size()));
+            }
+        }
+    }
+    
+    @Test
+    public void testGetStorageEndPointsDuringBootstrap()
+    {
+        TokenMetadata tmd = new TokenMetadata();
+        IPartitioner partitioner = new RandomPartitioner();
+        IReplicaPlacementStrategy strategy = new RackUnawareStrategy(tmd, partitioner, 3, 7000);
+
+        Token[] endPointTokens = new Token[5]; 
+        Token[] keyTokens = new Token[5];
+        
+        for (int i = 0; i < 5; i++) 
+        {
+            endPointTokens[i] = new BigIntegerToken(String.valueOf(10 * i));
+            keyTokens[i] = new BigIntegerToken(String.valueOf(10 * i + 5));
+        }
+        
+        List<EndPoint> hosts = new ArrayList<EndPoint>();
+        for (int i = 0; i < endPointTokens.length; i++)
+        {
+            EndPoint ep = new EndPoint("127.0.0." + String.valueOf(i + 1), 7001);
+            tmd.update(endPointTokens[i], ep);
+            hosts.add(ep);
+        }
+        
+        //Add bootstrap node id=6
+        Token bsToken = new BigIntegerToken(String.valueOf(25));
+        EndPoint bootstrapEndPoint = new EndPoint("127.0.0.6", 7001);
+        tmd.update(bsToken, bootstrapEndPoint, true);
+        
+        for (int i = 0; i < keyTokens.length; i++)
+        {
+            EndPoint[] endPoints = strategy.getStorageEndPointsForWrite(keyTokens[i]);
+            assertTrue(endPoints.length >=3);
+            List<EndPoint> endPointsList = Arrays.asList(endPoints);
+
+            for (int j = 0; j < 3; j++)
+            {
+                //Check that the old nodes are definitely included
+                assertTrue(endPointsList.contains(hosts.get((i + j + 1) % hosts.size())));   
             }
+            // for 5, 15, 25 this should include bootstrap node
+            if (i < 3)
+                assertTrue(endPointsList.contains(bootstrapEndPoint));
         }
     }
 }