You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/25 06:01:36 UTC
svn commit: r818712 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/
src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/
Author: jbellis
Date: Fri Sep 25 04:01:35 2009
New Revision: 818712
URL: http://svn.apache.org/viewvc?rev=818712&view=rev
Log:
record gossipped enpoint/token pairs as we see them. also clean up SystemTable code to avoid unnecessary CF fetches. patch by Gary Dusbabek and jbellis for CASSANDRA-437
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Fri Sep 25 04:01:35 2009
@@ -31,6 +31,7 @@
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.net.EndPoint;
public class SystemTable
{
@@ -53,25 +54,31 @@
}
}
- /*
- * This method is used to update the SystemTable on disk with the new token.
+ /**
+ * Record token being used by another node
+ */
+ public static synchronized void updateToken(EndPoint ep, Token token) throws IOException
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, LOCATION_CF);
+ cf.addColumn(new Column(ep.getHost().getBytes("UTF-8"), p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
+ RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
+ rm.add(cf);
+ rm.apply();
+ }
+
+ /**
+ * This method is used to update the System Table with the new token for this node
*/
public static synchronized void updateToken(Token token) throws IOException
{
assert metadata != null;
- IPartitioner p = StorageService.getPartitioner();
- Table table = Table.open(Table.SYSTEM_TABLE);
- /* Retrieve the "LocationInfo" column family */
- QueryFilter filter = new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_CF), TOKEN);
- ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(filter);
- long oldTokenColumnTimestamp = cf.getColumn(SystemTable.TOKEN).timestamp();
- /* create the "Token" whose value is the new token. */
- IColumn tokenColumn = new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
- /* replace the old "Token" column with this new one. */
if (logger.isDebugEnabled())
- logger.debug("Replacing old token " + p.getTokenFactory().fromByteArray(cf.getColumn(SystemTable.TOKEN).value()) + " with " + token);
+ logger.debug("Setting token to " + token);
+ IPartitioner p = StorageService.getPartitioner();
+ ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, LOCATION_CF);
+ cf.addColumn(new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), System.currentTimeMillis()));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
- cf.addColumn(tokenColumn);
rm.add(cf);
rm.apply();
metadata.setToken(token);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Fri Sep 25 04:01:35 2009
@@ -277,14 +277,14 @@
public static void main(String[] args) throws Throwable
{
StorageService ss = StorageService.instance();
- ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
- ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("3"), new EndPoint("A", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("6"), new EndPoint("B", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("9"), new EndPoint("C", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("12"), new EndPoint("D", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("15"), new EndPoint("E", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("18"), new EndPoint("F", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("21"), new EndPoint("G", 7000));
+ ss.updateTokenMetadataUnsafe(new BigIntegerToken("24"), new EndPoint("H", 7000));
Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
runnable.run();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Sep 25 04:01:35 2009
@@ -198,7 +198,7 @@
if (bootstrapSet.isEmpty())
{
isBootstrapMode = false;
- tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
+ updateTokenMetadata(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
logger_.info("Bootstrap completed! Now serving reads.");
/* Tell others you're not bootstrapping anymore */
@@ -207,6 +207,22 @@
return isBootstrapMode;
}
+ private void updateTokenMetadata(Token token, EndPoint endpoint, boolean isBootstraping)
+ {
+ tokenMetadata_.update(token, endpoint, isBootstraping);
+ if (!isBootstraping)
+ {
+ try
+ {
+ SystemTable.updateToken(endpoint, token);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/*
* Registers with Management Server
*/
@@ -317,7 +333,7 @@
}
/* TODO: used for testing */
- public void updateTokenMetadata(Token token, EndPoint endpoint)
+ public void updateTokenMetadataUnsafe(Token token, EndPoint endpoint)
{
tokenMetadata_.update(token, endpoint);
}
@@ -453,7 +469,7 @@
{
if (logger_.isDebugEnabled())
logger_.debug("Relocation for endpoint " + ep);
- tokenMetadata_.update(newToken, ep, bootstrapState);
+ updateTokenMetadata(newToken, ep, bootstrapState);
}
else
{
@@ -471,7 +487,7 @@
/*
* This is a new node and we just update the token map.
*/
- tokenMetadata_.update(newToken, ep, bootstrapState);
+ updateTokenMetadata(newToken, ep, bootstrapState);
}
}
else
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=818712&r1=818711&r2=818712&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Fri Sep 25 04:01:35 2009
@@ -23,7 +23,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.junit.Test;
@@ -41,7 +40,7 @@
/* New token needs to be part of the map for the algorithm
* to calculate the ranges correctly
*/
- StorageService.instance().updateTokenMetadata(newToken, newEndPoint);
+ StorageService.instance().updateTokenMetadataUnsafe(newToken, newEndPoint);
BootStrapper b = new BootStrapper(new EndPoint[]{newEndPoint}, newToken );
Map<Range,List<BootstrapSourceTarget>> res = b.getRangesWithSourceTarget();
@@ -74,7 +73,7 @@
{
EndPoint e = new EndPoint("127.0.0."+i, 100);
Token t = p.getDefaultToken();
- StorageService.instance().updateTokenMetadata(t, e);
+ StorageService.instance().updateTokenMetadataUnsafe(t, e);
}
return p;
}