You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:45 UTC
svn commit: r758999 [1/2] - in /incubator/cassandra/trunk:
src/org/apache/cassandra/db/ src/org/apache/cassandra/dht/
src/org/apache/cassandra/io/ src/org/apache/cassandra/locator/
src/org/apache/cassandra/service/ src/org/apache/cassandra/tools/ src/o...
Author: jbellis
Date: Fri Mar 27 02:44:44 2009
New Revision: 758999
URL: http://svn.apache.org/viewvc?rev=758999&view=rev
Log:
migrate from BigInteger to abstract Token, with BigIntegerToken and StringToken subclasses controlled by Random and OrderPreserving partitioners, respectively
Added:
incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java
Removed:
incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java
incubator/cassandra/trunk/test/org/apache/cassandra/dht/RangeTest.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:44 2009
@@ -1118,7 +1118,7 @@
continue;
}
}
- if ( Range.isKeyInRanges(ranges, lastkey) )
+ if ( Range.isKeyInRanges(lastkey, ranges) )
{
if(ssTableRange == null )
{
@@ -1148,7 +1148,7 @@
continue;
}
/* keep on looping until we find a key in the range */
- while ( !Range.isKeyInRanges(ranges, filestruct.key_ ) )
+ while ( !Range.isKeyInRanges(filestruct.key_, ranges) )
{
filestruct = getNextKey ( filestruct );
if(filestruct == null)
@@ -1156,7 +1156,7 @@
break;
}
/* check if we need to continue , if we are done with ranges empty the queue and close all file handles and exit */
- //if( !isLoop && StorageService.hash(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
+ //if( !isLoop && StorageService.token(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
//{
//filestruct.reader.close();
//filestruct = null;
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java Fri Mar 27 02:44:44 2009
@@ -19,20 +19,17 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BasicUtilities;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.GuidGenerator;
/**
@@ -64,23 +61,23 @@
public static class StorageMetadata
{
- private BigInteger storageId_;
+ private Token myToken;
private int generation_;
- StorageMetadata(BigInteger storageId, int generation)
+ StorageMetadata(Token storageId, int generation)
{
- storageId_ = storageId;
+ myToken = storageId;
generation_ = generation;
}
- public BigInteger getStorageId()
+ public Token getStorageId()
{
- return storageId_;
+ return myToken;
}
- public void setStorageId(BigInteger storageId)
+ public void setStorageId(Token storageId)
{
- storageId_ = storageId;
+ myToken = storageId;
}
public int getGeneration()
@@ -117,22 +114,17 @@
SystemTable sysTable = SystemTable.openSystemTable(SystemTable.name_);
Row row = sysTable.get(FBUtilities.getHostName());
- Random random = new Random();
+ IPartitioner p = StorageService.getPartitioner();
if ( row == null )
{
- /* Generate a token for this Storage node */
- String guid = GuidGenerator.guid();
- BigInteger token = StorageService.hash(guid);
- if ( token.signum() == -1 )
- token = token.multiply(BigInteger.valueOf(-1L));
-
+ Token token = p.getDefaultToken();
int generation = 1;
String key = FBUtilities.getHostName();
row = new Row(key);
ColumnFamily cf = new ColumnFamily(SystemTable.cfName_);
- cf.addColumn(new Column(SystemTable.token_, token.toByteArray()));
- cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)));
+ cf.addColumn(new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token)));
+ cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
row.addColumnFamily(cf);
sysTable.apply(row);
storageMetadata = new StorageMetadata( token, generation);
@@ -147,14 +139,15 @@
{
ColumnFamily columnFamily = columnFamilies.get(cfName);
- IColumn token = columnFamily.getColumn(SystemTable.token_);
- BigInteger bi = new BigInteger( token.value() );
+ IColumn tokenColumn = columnFamily.getColumn(SystemTable.token_);
+ Token token = p.getTokenFactory().fromByteArray(tokenColumn.value());
IColumn generation = columnFamily.getColumn(SystemTable.generation_);
int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
- columnFamily.addColumn(new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1));
- storageMetadata = new StorageMetadata( bi, gen );
+ Column generation2 = new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
+ columnFamily.addColumn(generation2);
+ storageMetadata = new StorageMetadata(token, gen);
break;
}
sysTable.reset(row);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 02:44:44 2009
@@ -19,15 +19,12 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.math.BigInteger;
-import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.service.StorageService;
@@ -101,7 +98,7 @@
public int compareTo(FileStruct f)
{
- return -StorageService.getPartitioner().getReverseDecoratedKeyComparator().compare(key_, f.key_);
+ return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key_, f.key_);
}
public void close() throws IOException
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java Fri Mar 27 02:44:44 2009
@@ -19,10 +19,11 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
@@ -31,7 +32,8 @@
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.IPartitioner;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -148,8 +150,9 @@
* This method is used to update the SystemTable with the
* new token.
*/
- public void updateToken(BigInteger token) throws IOException
+ public void updateToken(Token token) throws IOException
{
+ IPartitioner p = StorageService.getPartitioner();
if ( systemRow_ != null )
{
Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilyMap();
@@ -157,9 +160,9 @@
ColumnFamily columnFamily = columnFamilies.get(SystemTable.cfName_);
long oldTokenColumnTimestamp = columnFamily.getColumn(SystemTable.token_).timestamp();
/* create the "Token" whose value is the new token. */
- IColumn tokenColumn = new Column(SystemTable.token_, token.toByteArray(), oldTokenColumnTimestamp + 1);
+ IColumn tokenColumn = new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
/* replace the old "Token" column with this new one. */
- logger_.debug("Replacing old token " + new BigInteger( columnFamily.getColumn(SystemTable.token_).value() ).toString() + " with token " + token.toString());
+ logger_.debug("Replacing old token " + p.getTokenFactory().fromByteArray(columnFamily.getColumn(SystemTable.token_).value()) + " with " + token);
columnFamily.addColumn(tokenColumn);
reset(systemRow_);
}
@@ -180,17 +183,7 @@
{
LogUtil.init();
StorageService.instance().start();
- SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
+ SystemTable.openSystemTable(SystemTable.cfName_).updateToken(StorageService.token("503545744:0"));
System.out.println("Done");
-
- /*
- BigInteger hash = StorageService.hash("304700067:0");
- List<Range> ranges = new ArrayList<Range>();
- ranges.add( new Range(new BigInteger("1218069462158869448693347920504606362273788442553"), new BigInteger("1092770595533781724218060956188429069")) );
- if ( Range.isKeyInRanges(ranges, "304700067:0") )
- {
- System.out.println("Done");
- }
- */
}
}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java?rev=758999&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java Fri Mar 27 02:44:44 2009
@@ -0,0 +1,16 @@
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+
+public class BigIntegerToken extends Token<BigInteger>
+{
+ public BigIntegerToken(BigInteger token)
+ {
+ super(token);
+ }
+
+ // convenience method for testing
+ public BigIntegerToken(String token) {
+ this(new BigInteger(token));
+ }
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java Fri Mar 27 02:44:44 2009
@@ -18,24 +18,20 @@
package org.apache.cassandra.dht;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
/**
@@ -48,18 +44,18 @@
/* endpoints that need to be bootstrapped */
protected EndPoint[] targets_ = new EndPoint[0];
/* tokens of the nodes being bootstapped. */
- protected BigInteger[] tokens_ = new BigInteger[0];
+ protected final Token[] tokens_;
protected TokenMetadata tokenMetadata_ = null;
private List<EndPoint> filters_ = new ArrayList<EndPoint>();
- public BootStrapper(EndPoint[] target, BigInteger[] token)
+ public BootStrapper(EndPoint[] target, Token... token)
{
targets_ = target;
tokens_ = token;
tokenMetadata_ = StorageService.instance().getTokenMetadata();
}
- public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
+ public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
{
this(target, token);
Collections.addAll(filters_, filters);
@@ -71,14 +67,14 @@
{
logger_.debug("Beginning bootstrap process for " + targets_ + " ...");
/* copy the token to endpoint map */
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* remove the tokens associated with the endpoints being bootstrapped */
- for ( BigInteger token : tokens_ )
+ for (Token token : tokens_)
{
tokenToEndPointMap.remove(token);
}
- Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+ Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
logger_.debug("Total number of old ranges " + oldRanges.length);
/*
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java Fri Mar 27 02:44:44 2009
@@ -28,6 +28,8 @@
public String undecorateKey(String decoratedKey);
+ public Comparator<String> getDecoratedKeyComparator();
+
public Comparator<String> getReverseDecoratedKeyComparator();
public Token getTokenForKey(String key);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Fri Mar 27 02:44:44 2009
@@ -18,19 +18,19 @@
package org.apache.cassandra.dht;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.log4j.Logger;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
class LeaveJoinProtocolHelper
@@ -42,20 +42,20 @@
* a-----x-----y-----b then we want a mapping from
* (a, b] --> (a, x], (x, y], (y, b]
*/
- protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
+ protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
{
Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
- BigInteger[] tokens = new BigInteger[allTokens.length];
+ Token[] tokens = new Token[allTokens.length];
System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
Arrays.sort(tokens);
Range prevRange = null;
- BigInteger prevToken = null;
+ Token prevToken = null;
boolean bVal = false;
for ( Range oldRange : oldRanges )
{
- if ( bVal && prevRange != null )
+ if (bVal)
{
bVal = false;
List<Range> subRanges = splitRanges.get(prevRange);
@@ -65,7 +65,7 @@
prevRange = oldRange;
prevToken = oldRange.left();
- for ( BigInteger token : tokens )
+ for (Token token : tokens)
{
List<Range> subRanges = splitRanges.get(oldRange);
if ( oldRange.contains(token) )
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Fri Mar 27 02:44:44 2009
@@ -18,21 +18,20 @@
package org.apache.cassandra.dht;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
/**
@@ -48,11 +47,11 @@
/* endpoints that are to be moved. */
protected EndPoint[] targets_ = new EndPoint[0];
/* position where they need to be moved */
- protected BigInteger[] tokens_ = new BigInteger[0];
+ protected final Token[] tokens_;
/* token metadata information */
protected TokenMetadata tokenMetadata_ = null;
- public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
+ public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
{
targets_ = targets;
tokens_ = tokens;
@@ -65,24 +64,24 @@
{
logger_.debug("Beginning leave/join process for ...");
/* copy the token to endpoint map */
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
/* copy the endpoint to token map */
- Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+ Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
- Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+ Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
logger_.debug("Total number of old ranges " + oldRanges.length);
/* Calculate the list of nodes that handle the old ranges */
Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
/* Remove the tokens of the nodes leaving the ring */
- Set<BigInteger> tokens = getTokensForLeavingNodes();
+ Set<Token> tokens = getTokensForLeavingNodes();
oldTokens.removeAll(tokens);
Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
/* Get expanded range to initial range mapping */
Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
/* add the new token positions to the old tokens set */
- for ( BigInteger token : tokens_ )
+ for (Token token : tokens_)
oldTokens.add(token);
Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
/* replace the ranges that were split with the split ranges in the old configuration */
@@ -196,12 +195,12 @@
}
}
- private Set<BigInteger> getTokensForLeavingNodes()
+ private Set<Token> getTokensForLeavingNodes()
{
- Set<BigInteger> tokens = new HashSet<BigInteger>();
+ Set<Token> tokens = new HashSet<Token>();
for ( EndPoint target : targets_ )
{
- tokens.add( tokenMetadata_.getToken(target) );
+ tokens.add(tokenMetadata_.getToken(target));
}
return tokens;
}
@@ -276,16 +275,16 @@
public static void main(String[] args) throws Throwable
{
StorageService ss = StorageService.instance();
- ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
- ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
+ ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
- Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
+ Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
runnable.run();
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java Fri Mar 27 02:44:44 2009
@@ -19,15 +19,27 @@
package org.apache.cassandra.dht;
import java.io.UnsupportedEncodingException;
+import java.text.Collator;
import java.util.Comparator;
+import java.util.Locale;
import java.util.Random;
public class OrderPreservingPartitioner implements IPartitioner
{
+ // TODO make locale configurable. But don't just leave it up to the OS or you could really screw
+ // people over if they deploy on nodes with different OS locales.
+ static final Collator collator = Collator.getInstance(new Locale("en", "US"));
+
private static final Comparator<String> comparator = new Comparator<String>() {
public int compare(String o1, String o2)
{
- return o2.compareTo(o1);
+ return collator.compare(o1, o2);
+ }
+ };
+ private static final Comparator<String> reverseComparator = new Comparator<String>() {
+ public int compare(String o1, String o2)
+ {
+ return -comparator.compare(o1, o2);
}
};
@@ -41,11 +53,16 @@
return decoratedKey;
}
- public Comparator<String> getReverseDecoratedKeyComparator()
+ public Comparator<String> getDecoratedKeyComparator()
{
return comparator;
}
+ public Comparator<String> getReverseDecoratedKeyComparator()
+ {
+ return reverseComparator;
+ }
+
public StringToken getDefaultToken()
{
String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java Fri Mar 27 02:44:44 2009
@@ -25,7 +25,6 @@
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.service.StorageService;
/**
* This class generates a MD5 hash of the key. It uses the standard technique
@@ -41,7 +40,13 @@
{
BigInteger i1 = new BigInteger(o1.split(":")[0]);
BigInteger i2 = new BigInteger(o2.split(":")[0]);
- return i2.compareTo(i1);
+ return i1.compareTo(i2);
+ }
+ };
+ private static final Comparator<String> reverseComparator = new Comparator<String>() {
+ public int compare(String o1, String o2)
+ {
+ return -comparator.compare(o1, o2);
}
};
@@ -60,11 +65,16 @@
return decoratedKey.split(":")[1];
}
- public Comparator<String> getReverseDecoratedKeyComparator()
+ public Comparator<String> getDecoratedKeyComparator()
{
return comparator;
}
+ public Comparator<String> getReverseDecoratedKeyComparator()
+ {
+ return reverseComparator;
+ }
+
public BigIntegerToken getDefaultToken()
{
String guid = GuidGenerator.guid();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Fri Mar 27 02:44:44 2009
@@ -21,17 +21,9 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import org.apache.cassandra.gms.GossipDigest;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.IFileWriter;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
@@ -51,28 +43,12 @@
public static ICompactSerializer<Range> serializer()
{
return serializer_;
- }
-
- public static boolean isKeyInRanges(List<Range> ranges, String key)
- {
- if(ranges == null )
- return false;
-
- for ( Range range : ranges)
- {
- if(range.contains(StorageService.hash(key)))
- {
- return true ;
- }
- }
- return false;
- }
-
-
- private BigInteger left_;
- private BigInteger right_;
+ }
+
+ private Token left_;
+ private Token right_;
- public Range(BigInteger left, BigInteger right)
+ public Range(Token left, Token right)
{
left_ = left;
right_ = right;
@@ -82,7 +58,7 @@
* Returns the left endpoint of a range.
* @return left endpoint
*/
- public BigInteger left()
+ public Token left()
{
return left_;
}
@@ -91,7 +67,7 @@
* Returns the right endpoint of a range.
* @return right endpoint
*/
- public BigInteger right()
+ public Token right()
{
return right_;
}
@@ -102,9 +78,9 @@
* @param bi point in question
* @return true if the point contains within the range else false.
*/
- public boolean contains(BigInteger bi)
+ public boolean contains(Token bi)
{
- if ( left_.subtract(right_).signum() > 0 )
+ if ( left_.compareTo(right_) > 0 )
{
/*
* left is greater than right we are wrapping around.
@@ -114,16 +90,16 @@
* (2) k < b -- return true
* (3) b < k < a -- return false
*/
- if ( bi.subtract(left_).signum() >= 0 )
+ if ( bi.compareTo(left_) >= 0 )
return true;
- else return right_.subtract(bi).signum() > 0;
+ else return right_.compareTo(bi) > 0;
}
- else if ( left_.subtract(right_).signum() < 0 )
+ else if ( left_.compareTo(right_) < 0 )
{
/*
* This is the range [a, b) where a < b.
*/
- return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
+ return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) >=0 );
}
else
{
@@ -136,9 +112,9 @@
* @param range
* @return
*/
- private boolean isWrapAround(Range range)
+ private static boolean isWrapAround(Range range)
{
- return range.left_.subtract(range.right_).signum() > 0;
+ return range.left_.compareTo(range.right_) > 0;
}
public int compareTo(Range rhs)
@@ -156,6 +132,22 @@
return right_.compareTo(rhs.right_);
}
+
+ public static boolean isKeyInRanges(String key, List<Range> ranges)
+ {
+ assert ranges != null;
+
+ Token token = StorageService.token(key);
+ for (Range range : ranges)
+ {
+ if(range.contains(token))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
public boolean equals(Object o)
{
if ( !(o instanceof Range) )
@@ -178,15 +170,13 @@
class RangeSerializer implements ICompactSerializer<Range>
{
public void serialize(Range range, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(range.left().toString());
- dos.writeUTF(range.right().toString());
+ {
+ Token.serializer().serialize(range.left(), dos);
+ Token.serializer().serialize(range.right(), dos);
}
public Range deserialize(DataInputStream dis) throws IOException
{
- BigInteger left = new BigInteger(dis.readUTF());
- BigInteger right = new BigInteger(dis.readUTF());
- return new Range(left, right);
+ return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
}
}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java?rev=758999&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java Fri Mar 27 02:44:44 2009
@@ -0,0 +1,14 @@
+package org.apache.cassandra.dht;
+
+public class StringToken extends Token<String>
+{
+ protected StringToken(String token)
+ {
+ super(token);
+ }
+
+ public int compareTo(Token<String> o)
+ {
+ return OrderPreservingPartitioner.collator.compare(this.token, o.token);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java?rev=758999&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java Fri Mar 27 02:44:44 2009
@@ -0,0 +1,77 @@
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
+
+public abstract class Token<T extends Comparable> implements Comparable<Token<T>>
+{
+ private static final TokenSerializer serializer = new TokenSerializer();
+ public static TokenSerializer serializer()
+ {
+ return serializer;
+ }
+
+ T token;
+
+ protected Token(T token)
+ {
+ this.token = token;
+ }
+
+ /**
+ * This determines the comparison for node destination purposes.
+ */
+ public int compareTo(Token<T> o)
+ {
+ return token.compareTo(o.token);
+ }
+
+ public String toString()
+ {
+ return "Token(" + token + ")";
+ }
+
+ public boolean equals(Object obj)
+ {
+ if (!(obj instanceof Token)) {
+ return false;
+ }
+ return token.equals(((Token)obj).token);
+ }
+
+ public int hashCode()
+ {
+ return token.hashCode();
+ }
+
+ public static abstract class TokenFactory<T extends Comparable>
+ {
+ public abstract byte[] toByteArray(Token<T> token);
+ public abstract Token<T> fromByteArray(byte[] bytes);
+ public abstract Token<T> fromString(String string);
+ }
+
+ public static class TokenSerializer implements ICompactSerializer<Token>
+ {
+ public void serialize(Token token, DataOutputStream dos) throws IOException
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ byte[] b = p.getTokenFactory().toByteArray(token);
+ dos.writeInt(b.length);
+ dos.write(b);
+ }
+
+ public Token deserialize(DataInputStream dis) throws IOException
+ {
+ IPartitioner p = StorageService.getPartitioner();
+ int size = dis.readInt();
+ byte[] bytes = new byte[size];
+ dis.readFully(bytes);
+ return p.getTokenFactory().fromByteArray(bytes);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 02:44:44 2009
@@ -18,20 +18,30 @@
package org.apache.cassandra.io;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Hashtable;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.IPartitioner;
-import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.utils.BasicUtilities;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.db.RowMutation;
-
-import org.apache.log4j.Logger;
+import org.apache.cassandra.dht.IPartitioner;
/**
* This class is built on top of the SequenceFile. It stores
@@ -162,7 +172,7 @@
public int compareTo(KeyPositionInfo kPosInfo)
{
IPartitioner p = StorageService.getPartitioner();
- return -p.getReverseDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
+ return p.getDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
}
public String toString()
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 02:44:44 2009
@@ -658,33 +658,6 @@
}
/**
- * This is useful in figuring out the key in system. If an OPHF
- * is used then the "key" is the application supplied key. If a random
- * partitioning mechanism is used then the key is of the form
- * hash:key where hash is used internally as the key.
- *
- * @param in the DataInput stream from which the key needs to be read
- * @return the appropriate key based on partitioning type
- * @throws IOException
- */
- protected String readKeyFromDisk(DataInput in) throws IOException
- {
- String keyInDisk = null;
- PartitionerType pType = StorageService.getPartitionerType();
- switch( pType )
- {
- case OPHF:
- keyInDisk = in.readUTF();
- break;
-
- default:
- keyInDisk = in.readUTF().split(":")[0];
- break;
- }
- return keyInDisk;
- }
-
- /**
* This method dumps the next key/value into the DataOuputStream
* passed in. Always use this method to query for application
* specific data as it will have indexes.
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java Fri Mar 27 02:44:44 2009
@@ -1,6 +1,5 @@
package org.apache.cassandra.locator;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -8,11 +7,12 @@
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.log4j.Logger;
/**
* This class contains a helper method that will be used by
@@ -45,10 +45,10 @@
protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
{
EndPoint endPoint = null;
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List tokens = new ArrayList(tokenToEndPointMap.keySet());
Collections.sort(tokens);
- BigInteger token = tokenMetadata_.getToken(startPoint);
+ Token token = tokenMetadata_.getToken(startPoint);
int index = Collections.binarySearch(tokens, token);
if(index < 0)
{
@@ -76,7 +76,7 @@
* endpoint which is in the top N.
* Get the map of top N to the live nodes currently.
*/
- public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
{
List<EndPoint> liveList = new ArrayList<EndPoint>();
Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
@@ -107,6 +107,6 @@
return map;
}
- public abstract EndPoint[] getStorageEndPoints(BigInteger token);
+ public abstract EndPoint[] getStorageEndPoints(Token token);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Fri Mar 27 02:44:44 2009
@@ -18,9 +18,9 @@
package org.apache.cassandra.locator;
-import java.math.BigInteger;
import java.util.Map;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
@@ -32,7 +32,7 @@
*/
public interface IReplicaPlacementStrategy
{
- public EndPoint[] getStorageEndPoints(BigInteger token);
- public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
- public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);
+ public EndPoint[] getStorageEndPoints(Token token);
+ public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
+ public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Fri Mar 27 02:44:44 2009
@@ -1,6 +1,5 @@
package org.apache.cassandra.locator;
-import java.math.BigInteger;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@@ -8,6 +7,7 @@
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
@@ -27,7 +27,7 @@
super(tokenMetadata);
}
- public EndPoint[] getStorageEndPoints(BigInteger token)
+ public EndPoint[] getStorageEndPoints(Token token)
{
int startIndex = 0 ;
List<EndPoint> list = new ArrayList<EndPoint>();
@@ -35,8 +35,8 @@
boolean bOtherRack = false;
int foundCount = 0;
int N = DatabaseDescriptor.getReplicationFactor();
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List tokens = new ArrayList(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
if(index < 0)
@@ -107,7 +107,7 @@
return list.toArray(new EndPoint[0]);
}
- public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
{
throw new UnsupportedOperationException("This operation is not currently supported");
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Fri Mar 27 02:44:44 2009
@@ -1,6 +1,5 @@
package org.apache.cassandra.locator;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -8,6 +7,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
@@ -27,18 +27,18 @@
super(tokenMetadata);
}
- public EndPoint[] getStorageEndPoints(BigInteger token)
+ public EndPoint[] getStorageEndPoints(Token token)
{
return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());
}
- public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
{
int startIndex = 0 ;
List<EndPoint> list = new ArrayList<EndPoint>();
int foundCount = 0;
int N = DatabaseDescriptor.getReplicationFactor();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
if(index < 0)
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Fri Mar 27 02:44:44 2009
@@ -18,18 +18,13 @@
package org.apache.cassandra.locator;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
@@ -40,9 +35,9 @@
public class TokenMetadata
{
/* Maintains token to endpoint map of every node in the cluster. */
- private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();
+ private Map<Token, EndPoint> tokenToEndPointMap_ = new HashMap<Token, EndPoint>();
/* Maintains a reverse index of endpoint to token in the cluster. */
- private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
+ private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint, Token>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -51,7 +46,7 @@
{
}
- private TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+ private TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap)
{
tokenToEndPointMap_ = tokenToEndPointMap;
endPointToTokenMap_ = endPointToTokenMap;
@@ -59,20 +54,18 @@
public TokenMetadata cloneMe()
{
- Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
- Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
- return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+ return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap());
}
/**
* Update the two maps in an safe mode.
*/
- public void update(BigInteger token, EndPoint endpoint)
+ public void update(Token token, EndPoint endpoint)
{
lock_.writeLock().lock();
try
{
- BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+ Token oldToken = endPointToTokenMap_.get(endpoint);
if ( oldToken != null )
tokenToEndPointMap_.remove(oldToken);
tokenToEndPointMap_.put(token, endpoint);
@@ -93,7 +86,7 @@
lock_.writeLock().lock();
try
{
- BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+ Token oldToken = endPointToTokenMap_.get(endpoint);
if ( oldToken != null )
tokenToEndPointMap_.remove(oldToken);
endPointToTokenMap_.remove(endpoint);
@@ -104,7 +97,7 @@
}
}
- public BigInteger getToken(EndPoint endpoint)
+ public Token getToken(EndPoint endpoint)
{
lock_.readLock().lock();
try
@@ -133,12 +126,12 @@
/*
* Returns a safe clone of tokenToEndPointMap_.
*/
- public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
+ public Map<Token, EndPoint> cloneTokenEndPointMap()
{
lock_.readLock().lock();
try
{
- return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
+ return new HashMap<Token, EndPoint>( tokenToEndPointMap_ );
}
finally
{
@@ -149,12 +142,12 @@
/*
* Returns a safe clone of endPointTokenMap_.
*/
- public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
+ public Map<EndPoint, Token> cloneEndPointTokenMap()
{
lock_.readLock().lock();
try
{
- return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
+ return new HashMap<EndPoint, Token>( endPointToTokenMap_ );
}
finally
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java Fri Mar 27 02:44:44 2009
@@ -19,33 +19,30 @@
package org.apache.cassandra.service;
import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.LeaveJoinProtocolImpl;
-import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.*;
/*
* The load balancing algorithm here is an implementation of
@@ -164,7 +161,6 @@
if ( isMoveable_.get() )
{
MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
- BigInteger targetToken = moveMessage.getTargetToken();
/* Start the leave operation and join the ring at the position specified */
isMoveable_.set(false);
}
@@ -396,18 +392,18 @@
class MoveMessage implements Serializable
{
- private BigInteger targetToken_;
+ private Token targetToken_;
private MoveMessage()
{
}
- MoveMessage(BigInteger targetToken)
+ MoveMessage(Token targetToken)
{
targetToken_ = targetToken;
}
- BigInteger getTargetToken()
+ Token getTargetToken()
{
return targetToken_;
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:44 2009
@@ -18,19 +18,31 @@
package org.apache.cassandra.service;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.math.BigInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.log4j.Logger;
+
import org.apache.cassandra.analytics.AnalyticsContext;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.MultiThreadedStage;
@@ -55,7 +67,11 @@
import org.apache.cassandra.dht.BootStrapper;
import org.apache.cassandra.dht.BootstrapInitiateMessage;
import org.apache.cassandra.dht.BootstrapMetadataVerbHandler;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
import org.apache.cassandra.gms.FailureDetector;
@@ -77,16 +93,12 @@
import org.apache.cassandra.tools.TokenUpdateVerbHandler;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.commons.math.linear.RealMatrix;
-import org.apache.commons.math.linear.RealMatrixImpl;
-import org.apache.log4j.Logger;
-
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/*
@@ -126,7 +138,7 @@
public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
-
+
public static enum ConsistencyLevel
{
WEAK,
@@ -160,9 +172,9 @@
* function used by the system for
* partitioning.
*/
- public static BigInteger hash(String key)
+ public static Token token(String key)
{
- return partitioner_.hash(key);
+ return partitioner_.getTokenForKey(key);
}
public static IPartitioner getPartitioner() {
@@ -424,7 +436,7 @@
String hashingStrategy = DatabaseDescriptor.getHashingStrategy();
if (DatabaseDescriptor.ophf_.equalsIgnoreCase(hashingStrategy))
{
- partitioner_ = new OrderPreservingHashPartitioner();
+ partitioner_ = new OrderPreservingPartitioner();
}
else
{
@@ -525,7 +537,7 @@
}
/* TODO: remove later */
- public void updateTokenMetadata(BigInteger token, EndPoint endpoint)
+ public void updateTokenMetadata(Token token, EndPoint endpoint)
{
tokenMetadata_.update(token, endpoint);
}
@@ -571,8 +583,8 @@
public Map<Range, List<EndPoint>> getRangeToEndPointMap()
{
/* Get the token to endpoint map. */
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Set<Token> tokens = tokenToEndPointMap.keySet();
/* All the ranges for the tokens */
Range[] ranges = getAllRanges(tokens);
Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
@@ -605,7 +617,7 @@
* @param tokenToEndPointMap mapping of token to endpoints.
* @return mapping of ranges to the replicas responsible for them.
*/
- public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
{
logger_.debug("Constructing range to endpoint map ...");
Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
@@ -627,7 +639,7 @@
public Map<EndPoint, List<Range>> constructEndPointToRangesMap()
{
Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
Collection<EndPoint> mbrs = tokenToEndPointMap.values();
for ( EndPoint mbr : mbrs )
{
@@ -648,9 +660,9 @@
ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
if (nodeIdState != null)
{
- BigInteger newToken = new BigInteger(nodeIdState.getState());
+ Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
- BigInteger oldToken = tokenMetadata_.getToken(ep);
+ Token oldToken = tokenMetadata_.getToken(ep);
if ( oldToken != null )
{
@@ -732,7 +744,7 @@
* This method updates the token on disk and modifies the cached
* StorageMetadata instance. This is only for the local endpoint.
*/
- public void updateToken(BigInteger token) throws IOException
+ public void updateToken(Token token) throws IOException
{
/* update the token on disk */
SystemTable.openSystemTable(SystemTable.name_).updateToken(token);
@@ -773,12 +785,12 @@
{
if ( keys.length > 0 )
{
- BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- BigInteger[] tokens = tokenToEndPointMap.keySet().toArray( new BigInteger[0] );
+ Token token = tokenMetadata_.getToken(StorageService.tcpAddr_);
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ Token[] tokens = tokenToEndPointMap.keySet().toArray(new Token[tokenToEndPointMap.keySet().size()]);
Arrays.sort(tokens);
int index = Arrays.binarySearch(tokens, token) * (keys.length/tokens.length);
- BigInteger newToken = hash( keys[index] );
+ Token newToken = token( keys[index] );
/* update the token */
updateToken(newToken);
}
@@ -816,7 +828,7 @@
}
String[] allNodes = nodesToLoad.split(":");
EndPoint[] endpoints = new EndPoint[allNodes.length];
- BigInteger[] tokens = new BigInteger[allNodes.length];
+ Token[] tokens = new Token[allNodes.length];
for ( int i = 0; i < allNodes.length; ++i )
{
@@ -852,8 +864,8 @@
switch ( mode )
{
case FULL:
- BigInteger token = tokenMetadata_.getToken(endpoint);
- bootStrapper_.submit( new BootStrapper(new EndPoint[]{endpoint}, new BigInteger[]{token}) );
+ Token token = tokenMetadata_.getToken(endpoint);
+ bootStrapper_.submit(new BootStrapper(new EndPoint[]{endpoint}, token));
break;
case HINT:
@@ -871,26 +883,14 @@
public String getToken(EndPoint ep)
{
EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getStoragePort());
- BigInteger token = tokenMetadata_.getToken(ep2);
- return ( token == null ) ? BigInteger.ZERO.toString() : token.toString();
+ Token token = tokenMetadata_.getToken(ep2);
+ return ( token == null ) ? "" : token.toString();
}
public String getToken()
{
return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
}
-
- public void updateToken(String token)
- {
- try
- {
- updateToken(new BigInteger(token));
- }
- catch ( IOException ex )
- {
- logger_.debug(LogUtil.throwableToString(ex));
- }
- }
public String getLiveNodes()
{
@@ -973,9 +973,9 @@
*/
EndPoint getPredecessor(EndPoint ep)
{
- BigInteger token = tokenMetadata_.getToken(ep);
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Token token = tokenMetadata_.getToken(ep);
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
EndPoint predecessor = (index == 0) ? tokenToEndPointMap.get(tokens
@@ -990,9 +990,9 @@
*/
public EndPoint getSuccessor(EndPoint ep)
{
- BigInteger token = tokenMetadata_.getToken(ep);
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Token token = tokenMetadata_.getToken(ep);
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
Collections.sort(tokens);
int index = Collections.binarySearch(tokens, token);
EndPoint successor = (index == (tokens.size() - 1)) ? tokenToEndPointMap
@@ -1008,9 +1008,9 @@
*/
public Range getPrimaryRangeForEndPoint(EndPoint ep)
{
- BigInteger right = tokenMetadata_.getToken(ep);
+ Token right = tokenMetadata_.getToken(ep);
EndPoint predecessor = getPredecessor(ep);
- BigInteger left = tokenMetadata_.getToken(predecessor);
+ Token left = tokenMetadata_.getToken(predecessor);
return new Range(left, right);
}
@@ -1041,10 +1041,10 @@
* ranges.
* @return ranges in sorted order
*/
- public Range[] getAllRanges(Set<BigInteger> tokens)
+ public Range[] getAllRanges(Set<Token> tokens)
{
List<Range> ranges = new ArrayList<Range>();
- List<BigInteger> allTokens = new ArrayList<BigInteger>(tokens);
+ List<Token> allTokens = new ArrayList<Token>(tokens);
Collections.sort(allTokens);
int size = allTokens.size();
for ( int i = 1; i < size; ++i )
@@ -1067,9 +1067,9 @@
public EndPoint getPrimary(String key)
{
EndPoint endpoint = StorageService.tcpAddr_;
- BigInteger token = hash(key);
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+ Token token = token(key);
+ Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+ List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
if (tokens.size() > 0)
{
Collections.sort(tokens);
@@ -1115,7 +1115,7 @@
*/
public EndPoint[] getNStorageEndPoint(String key)
{
- BigInteger token = hash(key);
+ Token token = token(key);
return nodePicker_.getStorageEndPoints(token);
}
@@ -1150,7 +1150,7 @@
*/
public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
{
- BigInteger token = hash(key);
+ Token token = token(key);
return nodePicker_.getHintedStorageEndPoints(token);
}
@@ -1160,7 +1160,7 @@
*
* param @ token - position on the ring
*/
- public EndPoint[] getNStorageEndPoint(BigInteger token)
+ public EndPoint[] getNStorageEndPoint(Token token)
{
return nodePicker_.getStorageEndPoints(token);
}
@@ -1173,7 +1173,7 @@
* param @ token - position on the ring
* param @ tokens - w/o the following tokens in the token list
*/
- protected EndPoint[] getNStorageEndPoint(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+ protected EndPoint[] getNStorageEndPoint(Token token, Map<Token, EndPoint> tokenToEndPointMap)
{
return nodePicker_.getStorageEndPoints(token, tokenToEndPointMap);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java Fri Mar 27 02:44:44 2009
@@ -44,11 +44,6 @@
public void loadAll(String nodes);
/**
- * This method is used only for debug purpose.
- */
- public void updateToken(String token);
-
- /**
*
*/
public void doGC();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java Fri Mar 27 02:44:44 2009
@@ -19,12 +19,13 @@
package org.apache.cassandra.service;
import java.io.IOException;
-import java.math.BigInteger;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -37,7 +38,7 @@
public void doVerb(Message message)
{
byte[] body = (byte[])message.getMessageBody()[0];
- BigInteger token = new BigInteger(body);
+ Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
try
{
logger_.info("Updating the token to [" + token + "]");
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Fri Mar 27 02:44:44 2009
@@ -18,25 +18,20 @@
package org.apache.cassandra.tools;
-import java.util.*;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.math.BigInteger;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.config.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Fri Mar 27 02:44:44 2009
@@ -21,20 +21,20 @@
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
import org.apache.cassandra.utils.LogUtil;
/**
@@ -54,9 +54,8 @@
DataInputBuffer bufIn = new DataInputBuffer();
bufIn.reset(body, body.length);
/* Deserialize to get the token for this endpoint. */
- TokenUpdater.TokenInfoMessage tiMessage = TokenUpdater.TokenInfoMessage.serializer().deserialize(bufIn);
-
- BigInteger token = tiMessage.getToken();
+ Token token = Token.serializer().deserialize(bufIn);
+
logger_.info("Updating the token to [" + token + "]");
StorageService.instance().updateToken(token);
@@ -66,19 +65,19 @@
logger_.debug("Number of nodes in the header " + headers.size());
Set<String> nodes = headers.keySet();
+ IPartitioner p = StorageService.getPartitioner();
for ( String node : nodes )
{
logger_.debug("Processing node " + node);
byte[] bytes = headers.remove(node);
/* Send a message to this node to update its token to the one retreived. */
EndPoint target = new EndPoint(node, DatabaseDescriptor.getStoragePort());
- token = new BigInteger(bytes);
+ token = p.getTokenFactory().fromByteArray(bytes);
- /* Reset the new TokenInfoMessage */
- tiMessage = new TokenUpdater.TokenInfoMessage(target, token );
+ /* Reset the new Message */
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- TokenInfoMessage.serializer().serialize(tiMessage, dos);
+ Token.serializer().serialize(token, dos);
message.setMessageBody(new Object[]{bos.toByteArray()});
logger_.debug("Sending a token update message to " + target + " to update it to " + token);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java Fri Mar 27 02:44:44 2009
@@ -20,16 +20,12 @@
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
-import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -50,16 +46,17 @@
}
String ipPort = args[0];
- String token = args[1];
+ IPartitioner p = StorageService.getPartitioner();
+ Token token = p.getTokenFactory().fromString(args[1]);
String file = args[2];
String[] ipPortPair = ipPort.split(":");
EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
- TokenInfoMessage tiMessage = new TokenInfoMessage( target, new BigInteger(token) );
-
+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- TokenInfoMessage.serializer().serialize(tiMessage, dos);
+ Token.serializer().serialize(token, dos);
+
/* Construct the token update message to be sent */
Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostName(), port_), "", StorageService.tokenVerbHandler_, new Object[]{bos.toByteArray()} );
@@ -70,8 +67,8 @@
{
String[] nodeTokenPair = line.split(" ");
/* Add the node and the token pair into the header of this message. */
- BigInteger nodeToken = new BigInteger(nodeTokenPair[1]);
- tokenUpdateMessage.addHeader(nodeTokenPair[0], nodeToken.toByteArray());
+ Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
+ tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
}
System.out.println("Sending a token update message to " + target);
@@ -79,64 +76,5 @@
Thread.sleep(TokenUpdater.waitTime_);
System.out.println("Done sending the update message");
}
-
- public static class TokenInfoMessage implements Serializable
- {
- private static ICompactSerializer<TokenInfoMessage> serializer_;
- private static AtomicInteger idGen_ = new AtomicInteger(0);
-
- static
- {
- serializer_ = new TokenInfoMessageSerializer();
- }
-
- static ICompactSerializer<TokenInfoMessage> serializer()
- {
- return serializer_;
- }
- private EndPoint target_;
- private BigInteger token_;
-
- TokenInfoMessage(EndPoint target, BigInteger token)
- {
- target_ = target;
- token_ = token;
- }
-
- EndPoint getTarget()
- {
- return target_;
- }
-
- BigInteger getToken()
- {
- return token_;
- }
- }
-
- public static class TokenInfoMessageSerializer implements ICompactSerializer<TokenInfoMessage>
- {
- public void serialize(TokenInfoMessage tiMessage, DataOutputStream dos) throws IOException
- {
- byte[] node = EndPoint.toBytes( tiMessage.getTarget() );
- dos.writeInt(node.length);
- dos.write(node);
-
- byte[] token = tiMessage.getToken().toByteArray();
- dos.writeInt( token.length );
- dos.write(token);
- }
-
- public TokenInfoMessage deserialize(DataInputStream dis) throws IOException
- {
- byte[] target = new byte[dis.readInt()];
- dis.readFully(target);
-
- byte[] token = new byte[dis.readInt()];
- dis.readFully(token);
-
- return new TokenInfoMessage(EndPoint.fromBytes(target), new BigInteger(token));
- }
- }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java Fri Mar 27 02:44:44 2009
@@ -187,7 +187,7 @@
}
else
{ // already FULL or REMOVED, must probe
- // compute the double hash
+ // compute the double token
final int probe = 1 + (hash % (length - 2));
// if the slot we landed on is FULL (but not removed), probe
Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java Fri Mar 27 02:44:44 2009
@@ -9,6 +9,6 @@
public class SystemTableTest extends ServerTest {
@Test
public void testMain() throws IOException {
- SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
+ SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.token("503545744:0") );
}
}