You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/25 06:01:36 UTC

svn commit: r818712 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/

Author: jbellis
Date: Fri Sep 25 04:01:35 2009
New Revision: 818712

URL: http://svn.apache.org/viewvc?rev=818712&view=rev
Log:
record gossipped enpoint/token pairs as we see them.  also clean up SystemTable code to avoid unnecessary CF fetches.  patch by Gary Dusbabek and jbellis for CASSANDRA-437

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Fri Sep 25 04:01:35 2009
@@ -31,6 +31,7 @@
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.net.EndPoint;
 
 public class SystemTable
 {
@@ -53,25 +54,31 @@
         }
     }
 
-    /*
-     * This method is used to update the SystemTable on disk with the new token.
+    /**
+     * Record token being used by another node
+     */
+    public static synchronized void updateToken(EndPoint ep, Token token) throws IOException
+    {
+        IPartitioner p = StorageService.getPartitioner();
+        ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, LOCATION_CF);
+        cf.addColumn(new Column(ep.getHost().getBytes("UTF-8"), p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
+        RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
+        rm.add(cf);
+        rm.apply();
+    }
+
+    /**
+     * This method is used to update the System Table with the new token for this node
     */
     public static synchronized void updateToken(Token token) throws IOException
     {
         assert metadata != null;
-        IPartitioner p = StorageService.getPartitioner();
-        Table table = Table.open(Table.SYSTEM_TABLE);
-        /* Retrieve the "LocationInfo" column family */
-        QueryFilter filter = new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_CF), TOKEN);
-        ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(filter);
-        long oldTokenColumnTimestamp = cf.getColumn(SystemTable.TOKEN).timestamp();
-        /* create the "Token" whose value is the new token. */
-        IColumn tokenColumn = new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
-        /* replace the old "Token" column with this new one. */
         if (logger.isDebugEnabled())
-          logger.debug("Replacing old token " + p.getTokenFactory().fromByteArray(cf.getColumn(SystemTable.TOKEN).value()) + " with " + token);
+          logger.debug("Setting token to " + token);
+        IPartitioner p = StorageService.getPartitioner();
+        ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, LOCATION_CF);
+        cf.addColumn(new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
-        cf.addColumn(tokenColumn);
         rm.add(cf);
         rm.apply();
         metadata.setToken(token);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Fri Sep 25 04:01:35 2009
@@ -277,14 +277,14 @@
     public static void main(String[] args) throws Throwable
     {
         StorageService ss = StorageService.instance();
-        ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
-        ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("3"), new EndPoint("A", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("6"), new EndPoint("B", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("9"), new EndPoint("C", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("12"), new EndPoint("D", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("15"), new EndPoint("E", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("18"), new EndPoint("F", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"), new EndPoint("G", 7000));
+        ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"), new EndPoint("H", 7000));
         
         Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
         runnable.run();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Sep 25 04:01:35 2009
@@ -198,7 +198,7 @@
         if (bootstrapSet.isEmpty())
         {
             isBootstrapMode = false;
-            tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
+            updateTokenMetadata(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
 
             logger_.info("Bootstrap completed! Now serving reads.");
             /* Tell others you're not bootstrapping anymore */
@@ -207,6 +207,22 @@
         return isBootstrapMode;
     }
 
+    private void updateTokenMetadata(Token token, EndPoint endpoint, boolean isBootstraping)
+    {
+        tokenMetadata_.update(token, endpoint, isBootstraping);
+        if (!isBootstraping)
+        {
+            try
+            {
+                SystemTable.updateToken(endpoint, token);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     /*
      * Registers with Management Server
      */
@@ -317,7 +333,7 @@
     }
 
     /* TODO: used for testing */
-    public void updateTokenMetadata(Token token, EndPoint endpoint)
+    public void updateTokenMetadataUnsafe(Token token, EndPoint endpoint)
     {
         tokenMetadata_.update(token, endpoint);
     }
@@ -453,7 +469,7 @@
                 {
                     if (logger_.isDebugEnabled())
                       logger_.debug("Relocation for endpoint " + ep);
-                    tokenMetadata_.update(newToken, ep, bootstrapState);                    
+                    updateTokenMetadata(newToken, ep, bootstrapState);
                 }
                 else
                 {
@@ -471,7 +487,7 @@
                 /*
                  * This is a new node and we just update the token map.
                 */
-                tokenMetadata_.update(newToken, ep, bootstrapState);
+                updateTokenMetadata(newToken, ep, bootstrapState);
             }
         }
         else

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Fri Sep 25 04:01:35 2009
@@ -23,7 +23,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
 import org.junit.Test;
@@ -41,7 +40,7 @@
         /* New token needs to be part of the map for the algorithm
          * to calculate the ranges correctly
          */
-        StorageService.instance().updateTokenMetadata(newToken, newEndPoint);
+        StorageService.instance().updateTokenMetadataUnsafe(newToken, newEndPoint);
 
         BootStrapper b = new BootStrapper(new EndPoint[]{newEndPoint}, newToken );
         Map<Range,List<BootstrapSourceTarget>> res = b.getRangesWithSourceTarget();
@@ -74,7 +73,7 @@
         {
             EndPoint e  = new EndPoint("127.0.0."+i, 100);
             Token t = p.getDefaultToken();
-            StorageService.instance().updateTokenMetadata(t, e);
+            StorageService.instance().updateTokenMetadataUnsafe(t, e);
         }
         return p;
     }