You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/03/27 03:44:29 UTC
svn commit: r758998 [1/2] - in
/incubator/cassandra/trunk/src/org/apache/cassandra: db/ dht/ io/ locator/
service/
Author: jbellis
Date: Fri Mar 27 02:44:28 2009
New Revision: 758998
URL: http://svn.apache.org/viewvc?rev=758998&view=rev
Log:
r/m unused code dealing with Ranges and tokens
Removed:
incubator/cassandra/trunk/src/org/apache/cassandra/service/LocationInfoVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenInfoVerbHandler.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:28 2009
@@ -47,7 +47,6 @@
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
@@ -81,7 +80,7 @@
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
/* Flag indicates if a compaction is in process */
- public AtomicBoolean isCompacting_ = new AtomicBoolean(false);
+ private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
ColumnFamilyStore(String table, String columnFamily) throws IOException
{
@@ -131,7 +130,7 @@
for (File file : files)
{
String filename = file.getName();
- if(((file.length() == 0) || (filename.indexOf("-" + SSTable.temporaryFile_) != -1) ) && (filename.indexOf(columnFamily_) != -1))
+ if(((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_)) ) && (filename.contains(columnFamily_)))
{
file.delete();
continue;
@@ -140,7 +139,7 @@
String[] tblCfName = getTableAndColumnFamilyName(filename);
if (tblCfName[0].equals(table_)
&& tblCfName[1].equals(columnFamily_)
- && filename.indexOf("-Data.db") != -1)
+ && filename.contains("-Data.db"))
{
ssTables.add(file.getAbsoluteFile());
}
@@ -178,7 +177,7 @@
* disk and the total space oocupied by the data files
* associated with this Column Family.
*/
- public String cfStats(String newLineSeparator, java.text.DecimalFormat df)
+ public String cfStats(String newLineSeparator)
{
StringBuilder sb = new StringBuilder();
/*
@@ -261,7 +260,7 @@
if( ranges != null)
futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
else
- MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, ranges, skip);
+ MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
boolean result = true;
try
@@ -334,8 +333,7 @@
{
// Psuedo increment so that we do not generate consecutive numbers
fileIndexGenerator_.incrementAndGet();
- String name = table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
- return name;
+ return table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
}
/*
@@ -345,8 +343,7 @@
{
// Psuedo increment so that we do not generate consecutive numbers
fileIndexGenerator_.incrementAndGet();
- String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet() ;
- return name;
+ return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet();
}
/*
@@ -367,9 +364,8 @@
lowestIndex = getIndexFromFileName(files.get(0));
index = lowestIndex + 1 ;
-
- String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index ;
- return name;
+
+ return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index;
}
@@ -387,14 +383,6 @@
}
/*
- * This version is used when we forceflush.
- */
- void switchMemtable() throws IOException
- {
- memtable_.set( new Memtable(table_, columnFamily_) );
- }
-
- /*
* This version is used only on start up when we are recovering from logs.
* In the future we may want to parellelize the log processing for a table
* by having a thread per log file present for recovery. Re-visit at that
@@ -413,7 +401,7 @@
memtable_.get().forceflush(this);
}
- void forceFlushBinary() throws IOException
+ void forceFlushBinary()
{
BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
//binaryMemtable_.get().flush(true);
@@ -456,7 +444,15 @@
*/
List<ColumnFamily> getColumnFamilies(String key, String columnFamilyColumn, IFilter filter) throws IOException
{
- List<ColumnFamily> columnFamilies = getMemoryColumnFamilies(key, columnFamilyColumn, filter);
+ List<ColumnFamily> columnFamilies1 = new ArrayList<ColumnFamily>();
+ /* Get the ColumnFamily from Memtable */
+ getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies1);
+ if (columnFamilies1.size() == 0 || !filter.isDone())
+ {
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies1);
+ }
+ List<ColumnFamily> columnFamilies = columnFamilies1;
if (columnFamilies.size() == 0 || !filter.isDone())
{
long start = System.currentTimeMillis();
@@ -466,24 +462,6 @@
return columnFamilies;
}
- private List<ColumnFamily> getMemoryColumnFamilies(String key, String columnFamilyColumn, IFilter filter)
- {
- List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
- /* Get the ColumnFamily from Memtable */
- getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies);
- if (columnFamilies.size() == 0 || !filter.isDone())
- {
- /* Check if MemtableManager has any historical information */
- MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
- }
- return columnFamilies;
- }
-
- public ColumnFamily getColumnFamilyFromMemory(String key, String columnFamilyColumn, IFilter filter)
- {
- return resolveAndRemoveDeleted(getMemoryColumnFamilies(key, columnFamilyColumn, filter));
- }
-
/**
* Fetch from disk files and go in sorted order to be efficient
* This fn exits as soon as the required data is found.
@@ -651,7 +629,7 @@
* param @ filename - filename just flushed to disk
* param @ bf - bloom filter which indicates the keys that are in this file.
*/
- void storeLocation(String filename, BloomFilter bf) throws IOException
+ void storeLocation(String filename, BloomFilter bf)
{
boolean doCompaction = false;
int ssTableSize = 0;
@@ -686,7 +664,7 @@
}
}
- PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
+ PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
{
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
if (files.size() > 1 || (ranges != null && files.size() > 0))
@@ -721,8 +699,7 @@
{
logger_.warn("Unable to close file :" + file);
}
- continue;
- }
+ }
}
}
return pq;
@@ -774,7 +751,7 @@
/*
* Break the files into buckets and then compact.
*/
- void doCompaction() throws IOException
+ void doCompaction()
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -816,7 +793,6 @@
{
isCompacting_.set(false);
}
- return;
}
void doMajorCompaction(long skip) throws IOException
@@ -824,18 +800,13 @@
doMajorCompactionInternal( skip );
}
- void doMajorCompaction() throws IOException
- {
- doMajorCompactionInternal( 0 );
- }
-
/*
* Compact all the files irrespective of the size.
* skip : is the ammount in Gb of the files to be skipped
* all files greater than skip GB are skipped for this compaction.
* Except if skip is 0 , in that case this is ignored and all files are taken.
*/
- void doMajorCompactionInternal(long skip) throws IOException
+ void doMajorCompactionInternal(long skip)
{
isCompacting_.set(true);
List<String> filesInternal = new ArrayList<String>(ssTables_);
@@ -868,7 +839,6 @@
{
isCompacting_.set(false);
}
- return ;
}
/*
@@ -906,41 +876,14 @@
return maxFile;
}
- Range getMaxRange( List<Range> ranges )
- {
- Range maxRange = new Range( BigInteger.ZERO, BigInteger.ZERO );
- for( Range range : ranges)
- {
- if( range.left().compareTo(maxRange.left()) > 0 )
- {
- maxRange = range;
- }
- }
- return maxRange;
- }
-
- boolean isLoopAround ( List<Range> ranges )
- {
- boolean isLoop = false;
- for( Range range : ranges)
- {
- if( range.left().compareTo(range.right()) > 0 )
- {
- isLoop = true;
- break;
- }
- }
- return isLoop;
- }
-
- boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+ boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
boolean result = true;
try
{
- result = doFileAntiCompaction(files, ranges, target, bufSize_, fileList, null);
+ result = doFileAntiCompaction(files, ranges, target, fileList, null);
}
catch ( Exception ex)
{
@@ -996,7 +939,7 @@
* and only keeps keys that this node is responsible for.
* @throws IOException
*/
- void doCleanupCompaction() throws IOException
+ void doCleanupCompaction()
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -1030,7 +973,7 @@
Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
- doFileAntiCompaction(files, myRanges, null, bufSize_, newFiles, compactedBloomFilters);
+ doFileAntiCompaction(files, myRanges, null, newFiles, compactedBloomFilters);
logger_.debug("Original file : " + file + " of size " + new File(file).length());
lock_.writeLock().lock();
try
@@ -1061,12 +1004,11 @@
* @param files
* @param ranges
* @param target
- * @param minBufferSize
* @param fileList
* @return
* @throws IOException
*/
- boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, int minBufferSize, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
+ boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
{
boolean result = false;
long startTime = System.currentTimeMillis();
@@ -1092,7 +1034,7 @@
+ expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
return result;
}
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, minBufferSize);
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.bufSize_);
if (pq.size() > 0)
{
mergedFileName = getTempFileName();
@@ -1149,8 +1091,7 @@
catch ( Exception ex)
{
logger_.warn(LogUtil.throwableToString(ex));
- continue;
- }
+ }
}
// Now after merging all crap append to the sstable
columnFamily = resolveAndRemoveDeleted(columnFamilies);
@@ -1235,8 +1176,7 @@
// and it will be deleted after compaction.
logger_.warn(LogUtil.throwableToString(ex));
filestruct.reader_.close();
- continue;
- }
+ }
}
lfs.clear();
lastkey = null;
@@ -1286,7 +1226,7 @@
* to get the latest data.
*
*/
- void doFileCompaction(List<String> files, int minBufferSize) throws IOException
+ void doFileCompaction(List<String> files, int minBufferSize)
{
String newfile = null;
long startTime = System.currentTimeMillis();
@@ -1361,9 +1301,9 @@
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
}
catch ( Exception ex)
- {
- continue;
- }
+ {
+ logger_.warn("error in filecompaction", ex);
+ }
}
// Now after merging all crap append to the sstable
columnFamily = resolveAndRemoveDeleted(columnFamilies);
@@ -1418,8 +1358,7 @@
// in any case we have read as far as possible from it
// and it will be deleted after compaction.
filestruct.reader_.close();
- continue;
- }
+ }
}
lfs.clear();
lastkey = null;
@@ -1470,7 +1409,6 @@
logger_.debug("Total bytes Read for compaction ..." + totalBytesRead);
logger_.debug("Total bytes written for compaction ..."
+ totalBytesWritten + " Total keys read ..." + totalkeysRead);
- return;
}
public boolean isSuper()
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 02:44:28 2009
@@ -79,7 +79,6 @@
private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
/* Lock and Condition for notifying new clients about Memtable switches */
Lock lock_ = new ReentrantLock();
- Condition condition_;
Memtable(String table, String cfName) throws IOException
{
@@ -94,7 +93,6 @@
));
}
- condition_ = lock_.newCondition();
table_ = table;
cfName_ = cfName;
creationTime_ = System.currentTimeMillis();
@@ -195,13 +193,6 @@
currentObjectCount_.addAndGet(newCount - oldCount);
}
- private boolean isLifetimeViolated()
- {
- /* Memtable lifetime in terms of milliseconds */
- long lifetimeInMillis = DatabaseDescriptor.getMemtableLifetime() * 3600 * 1000;
- return ( ( System.currentTimeMillis() - creationTime_ ) >= lifetimeInMillis );
- }
-
boolean isThresholdViolated(String key)
{
boolean bVal = false;//isLifetimeViolated();
@@ -366,26 +357,6 @@
return filter.filter(columnFamilyColumn, columnFamily);
}
- ColumnFamily get(String key, String cfName)
- {
- printExecutorStats();
- Callable<ColumnFamily> call = new Getter(key, cfName);
- ColumnFamily cf = null;
- try
- {
- cf = apartments_.get(cfName_).submit(call).get();
- }
- catch ( ExecutionException ex )
- {
- logger_.debug(LogUtil.throwableToString(ex));
- }
- catch ( InterruptedException ex2 )
- {
- logger_.debug(LogUtil.throwableToString(ex2));
- }
- return cf;
- }
-
ColumnFamily get(String key, String cfName, IFilter filter)
{
printExecutorStats();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java Fri Mar 27 02:44:28 2009
@@ -87,10 +87,6 @@
columnFamilyStore_.doCompaction();
logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
}
- catch (IOException e)
- {
- logger_.debug( LogUtil.throwableToString(e) );
- }
catch (Throwable th)
{
logger_.error( LogUtil.throwableToString(th) );
@@ -122,16 +118,9 @@
public Boolean call()
{
boolean result = true;
- try
- {
- logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
- result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
- logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
- }
- catch (IOException e)
- {
- logger_.debug( LogUtil.throwableToString(e) );
- }
+ logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+ logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
return result;
}
}
@@ -180,10 +169,6 @@
columnFamilyStore_.doCleanupCompaction();
logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
}
- catch (IOException e)
- {
- logger_.debug( LogUtil.throwableToString(e) );
- }
catch (Throwable th)
{
logger_.error( LogUtil.throwableToString(th) );
@@ -223,14 +208,9 @@
public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
{
return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
- }
-
- public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
- {
- return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges) );
}
- public void submitMajor(ColumnFamilyStore columnFamilyStore, List<Range> ranges, long skip)
+ public void submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
{
compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java Fri Mar 27 02:44:28 2009
@@ -488,7 +488,7 @@
for ( String cfName : cfNames )
{
ColumnFamilyStore cfStore = columnFamilyStores_.get(cfName);
- sb.append(cfStore.cfStats(newLineSeparator, df));
+ sb.append(cfStore.cfStats(newLineSeparator));
}
int newLength = sb.toString().length();
@@ -592,7 +592,7 @@
{
ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
if ( cfStore != null )
- MinorCompactionManager.instance().submitMajor(cfStore, null, 0);
+ MinorCompactionManager.instance().submitMajor(cfStore, 0);
}
}
@@ -684,26 +684,6 @@
dbAnalyticsSource_.updateReadStatistics(timeTaken);
return row;
}
-
- public Row getRowFromMemory(String key)
- {
- Row row = new Row(key);
- Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
- long start = System.currentTimeMillis();
- for ( String columnFamily : columnFamilies )
- {
- ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily);
- if ( cfStore != null )
- {
- ColumnFamily cf = cfStore.getColumnFamilyFromMemory(key, columnFamily, new IdentityFilter());
- if ( cf != null )
- row.addColumnFamily(cf);
- }
- }
- long timeTaken = System.currentTimeMillis() - start;
- dbAnalyticsSource_.updateReadStatistics(timeTaken);
- return row;
- }
/**
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java Fri Mar 27 02:44:28 2009
@@ -130,21 +130,5 @@
logger_.debug( LogUtil.throwableToString(th) );
}
}
-
- private Range getMyOldRange()
- {
- Map<EndPoint, BigInteger> oldEndPointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
- Map<BigInteger, EndPoint> oldTokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- oldEndPointToTokenMap.remove(targets_);
- oldTokenToEndPointMap.remove(tokens_);
-
- BigInteger myToken = oldEndPointToTokenMap.get(StorageService.getLocalStorageEndPoint());
- List<BigInteger> allTokens = new ArrayList<BigInteger>(oldTokenToEndPointMap.keySet());
- Collections.sort(allTokens);
- int index = Collections.binarySearch(allTokens, myToken);
- /* Calculate the lhs for the range */
- BigInteger lhs = (index == 0) ? allTokens.get(allTokens.size() - 1) : allTokens.get( index - 1);
- return new Range( lhs, myToken );
- }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Fri Mar 27 02:44:28 2009
@@ -95,45 +95,7 @@
{
return right_;
}
-
- boolean isSplitRequired()
- {
- return ( left_.subtract(right_).signum() >= 0 );
- }
-
- public boolean isSplitBy(BigInteger bi)
- {
- if ( left_.subtract(right_).signum() > 0 )
- {
- /*
- * left is greater than right we are wrapping around.
- * So if the interval is [a,b) where a > b then we have
- * 3 cases one of which holds for any given token k.
- * (1) k > a -- return true
- * (2) k < b -- return true
- * (3) b < k < a -- return false
- */
- if ( bi.subtract(left_).signum() > 0 )
- return true;
- else if (right_.subtract(bi).signum() > 0 )
- return true;
- else
- return false;
- }
- else if ( left_.subtract(right_).signum() < 0 )
- {
- /*
- * This is the range [a, b) where a < b.
- */
- return ( bi.subtract(left_).signum() > 0 && right_.subtract(bi).signum() > 0 );
- }
- else
- {
- // should never be here.
- return true;
- }
- }
-
+
/**
* Helps determine if a given point on the DHT ring is contained
* in the range in question.
@@ -154,10 +116,7 @@
*/
if ( bi.subtract(left_).signum() >= 0 )
return true;
- else if (right_.subtract(bi).signum() > 0 )
- return true;
- else
- return false;
+ else return right_.subtract(bi).signum() > 0;
}
else if ( left_.subtract(right_).signum() < 0 )
{
@@ -171,58 +130,7 @@
return true;
}
}
-
- /**
- * Helps determine if a given range on the DHT ring is contained
- * within the range associated with the <i>this</i> pointer.
- * @param rhs rhs in question
- * @return true if the point contains within the range else false.
- */
- public boolean contains(Range rhs)
- {
- /*
- * If (a, b] and (c, d} are not wrap arounds
- * then return true if a <= c <= d <= b.
- */
- if ( !isWrapAround(this) && !isWrapAround(rhs) )
- {
- if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(rhs.right_).signum() >= 0 )
- return true;
- else
- return false;
- }
-
- /*
- * If lhs is a wrap around and rhs is not then
- * rhs.left >= lhs.left and rhs.right >= lhs.left.
- */
- if ( isWrapAround(this) && !isWrapAround(rhs) )
- {
- if ( rhs.left_.subtract(left_).signum() >= 0 && rhs.right_.subtract(right_).signum() >= 0 )
- return true;
- else
- return false;
- }
-
- /*
- * If lhs is not a wrap around and rhs is a wrap
- * around then we just return false.
- */
- if ( !isWrapAround(this) && isWrapAround(rhs) )
- return false;
-
- if( isWrapAround(this) && isWrapAround(rhs) )
- {
- if ( rhs.left_.subtract(left_).signum() >= 0 && right_.subtract(right_).signum() >= 0 )
- return true;
- else
- return false;
- }
-
- /* should never be here */
- return false;
- }
-
+
/**
* Tells if the given range is a wrap around.
* @param range
@@ -230,8 +138,7 @@
*/
private boolean isWrapAround(Range range)
{
- boolean bVal = ( range.left_.subtract(range.right_).signum() > 0 ) ? true : false;
- return bVal;
+ return range.left_.subtract(range.right_).signum() > 0;
}
public int compareTo(Range rhs)
@@ -254,10 +161,7 @@
if ( !(o instanceof Range) )
return false;
Range rhs = (Range)o;
- if ( left_.equals(rhs.left_) && right_.equals(rhs.right_) )
- return true;
- else
- return false;
+ return left_.equals(rhs.left_) && right_.equals(rhs.right_);
}
public int hashCode()
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 02:44:28 2009
@@ -154,11 +154,6 @@
position_ = position;
}
- public String key()
- {
- return decoratedKey;
- }
-
public long position()
{
return position_;
@@ -510,11 +505,6 @@
return getFile(dataFile_);
}
- public long lastModified()
- {
- return dataWriter_.lastModified();
- }
-
/*
* Seeks to the specified key on disk.
*/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 02:44:28 2009
@@ -212,13 +212,11 @@
}
public static class BufferWriter extends Writer
- {
- private int size_;
+ {
BufferWriter(String filename, int size) throws IOException
{
super(filename, size);
- size_ = size;
}
@Override
@@ -270,145 +268,6 @@
}
}
- public static class ConcurrentWriter extends AbstractWriter
- {
- private FileChannel fc_;
-
- public ConcurrentWriter(String filename) throws IOException
- {
- super(filename);
- RandomAccessFile raf = new RandomAccessFile(filename, "rw");
- fc_ = raf.getChannel();
- }
-
- public long getCurrentPosition() throws IOException
- {
- return fc_.position();
- }
-
- public void seek(long position) throws IOException
- {
- fc_.position(position);
- }
-
- public void append(DataOutputBuffer buffer) throws IOException
- {
- int length = buffer.getLength();
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(length);
- byteBuffer.put(buffer.getData(), 0, length);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- public void append(DataOutputBuffer keyBuffer, DataOutputBuffer buffer) throws IOException
- {
- int keyBufLength = keyBuffer.getLength();
- if ( keyBuffer == null || keyBufLength == 0 )
- throw new IllegalArgumentException("Key cannot be NULL or of zero length.");
-
- /* Size allocated "int" for key length + key + "int" for data length + data */
- int length = buffer.getLength();
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect( 4 + keyBufLength + 4 + length );
- byteBuffer.putInt(keyBufLength);
- byteBuffer.put(keyBuffer.getData(), 0, keyBufLength);
- byteBuffer.putInt(length);
- byteBuffer.put(buffer.getData(), 0, length);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- public void append(String key, DataOutputBuffer buffer) throws IOException
- {
- if ( key == null )
- throw new IllegalArgumentException("Key cannot be NULL.");
-
- int length = buffer.getLength();
- /* Size allocated : utfPrefix_ + key length + "int" for data size + data */
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect( SequenceFile.utfPrefix_ + key.length() + 4 + length);
- SequenceFile.writeUTF(byteBuffer, key);
- byteBuffer.putInt(length);
- byteBuffer.put(buffer.getData(), 0, length);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- public void append(String key, byte[] value) throws IOException
- {
- if ( key == null )
- throw new IllegalArgumentException("Key cannot be NULL.");
-
- /* Size allocated key length + "int" for data size + data */
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(utfPrefix_ + key.length() + 4 + value.length);
- SequenceFile.writeUTF(byteBuffer, key);
- byteBuffer.putInt(value.length);
- byteBuffer.put(value);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- public void append(String key, long value) throws IOException
- {
- if ( key == null )
- throw new IllegalArgumentException("Key cannot be NULL.");
-
- /* Size allocated key length + a long */
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SequenceFile.utfPrefix_ + key.length() + 8);
- SequenceFile.writeUTF(byteBuffer, key);
- byteBuffer.putLong(value);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- /*
- * Be extremely careful while using this API. This currently
- * used to write the commit log header in the commit logs.
- * If not used carefully it could completely screw up reads
- * of other key/value pairs that are written.
- */
- public long writeDirect(byte[] bytes) throws IOException
- {
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytes.length);
- byteBuffer.put(bytes);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- return fc_.position();
- }
-
- public void writeLong(long value) throws IOException
- {
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8);
- byteBuffer.putLong(value);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- public void close() throws IOException
- {
- fc_.close();
- }
-
- public void close(byte[] footer, int size) throws IOException
- {
- /* Size is marker length + "int" for size + footer data */
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect( utfPrefix_ + SequenceFile.marker_.length() + 4 + footer.length);
- SequenceFile.writeUTF(byteBuffer, SequenceFile.marker_);
- byteBuffer.putInt(size);
- byteBuffer.put(footer);
- byteBuffer.flip();
- fc_.write(byteBuffer);
- }
-
- public String getFileName()
- {
- return filename_;
- }
-
- public long getFileSize() throws IOException
- {
- return fc_.size();
- }
- }
-
public static class FastConcurrentWriter extends AbstractWriter
{
private FileChannel fc_;
@@ -832,245 +691,6 @@
*
* @param key key we are interested in.
* @param dos DataOutputStream that needs to be filled.
- * @param cf the IColumn we want to read
- * @param section region of the file that needs to be read
- * @return total number of bytes read/considered
- */
- public long next(String key, DataOutputBuffer bufOut, String cf, Coordinate section) throws IOException
- {
- String[] values = RowMutation.getColumnAndColumnFamily(cf);
- String columnFamilyName = values[0];
- String columnName = (values.length == 1) ? null : values[1];
-
- long bytesRead = -1L;
- if ( isEOF() )
- return bytesRead;
- seekTo(key, section);
- /* note the position where the key starts */
- long startPosition = file_.getFilePointer();
- String keyInDisk = readKeyFromDisk(file_);
- if ( keyInDisk != null )
- {
- /*
- * If key on disk is greater than requested key
- * we can bail out since we exploit the property
- * of the SSTable format.
- */
- if ( keyInDisk.compareTo(key) > 0 )
- return bytesRead;
-
- /*
- * If we found the key then we populate the buffer that
- * is passed in. If not then we skip over this key and
- * position ourselves to read the next one.
- */
- int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
- {
- /* write the key into buffer */
- bufOut.writeUTF( keyInDisk );
-
- if(columnName == null)
- {
- int bytesSkipped = IndexHelper.skipBloomFilterAndIndex(file_);
- /*
- * read the correct number of bytes for the column family and
- * write data into buffer. Substract from dataSize the bloom
- * filter size.
- */
- dataSize -= bytesSkipped;
- /* write the data size */
- bufOut.writeInt(dataSize);
- /* write the data into buffer, except the boolean we have read */
- bufOut.write(file_, dataSize);
- }
- else
- {
- /* Read the bloom filter for the column summarization */
- long preBfPos = file_.getFilePointer();
- BloomFilter bf = defreezeBloomFilter();
- /* column does not exist in this file */
- if ( !bf.isPresent(columnName) )
- return bytesRead;
- long postBfPos = file_.getFilePointer();
- dataSize -= (postBfPos - preBfPos);
-
- List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- /* Read the name indexes if present */
- int totalBytesRead = handleColumnNameIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
-
- /* read the column family name */
- String cfName = file_.readUTF();
- dataSize -= (utfPrefix_ + cfName.length());
-
- /* read if this cf is marked for delete */
- boolean markedForDelete = file_.readBoolean();
- dataSize -= 1;
-
- /* read the total number of columns */
- int totalNumCols = file_.readInt();
- dataSize -= 4;
-
- /* get the column range we have to read */
- IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnNameIndexInfo(columnName);
- IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromNameIndex(cIndexInfo, columnIndexList, dataSize, totalNumCols);
-
- Coordinate coordinate = columnRange.coordinate();
- /* seek to the correct offset to the data, and calculate the data size */
- file_.skipBytes((int)coordinate.start_);
- dataSize = (int)(coordinate.end_ - coordinate.start_);
-
- /*
- * write the number of columns in the column family we are returning:
- * dataSize that we are reading +
- * length of column family name +
- * one booleanfor deleted or not +
- * one int for number of columns
- */
- bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
- /* write the column family name */
- bufOut.writeUTF(cfName);
- /* write if this cf is marked for delete */
- bufOut.writeBoolean(markedForDelete);
- /* write number of columns */
- bufOut.writeInt(columnRange.count());
- /* now write the columns */
- bufOut.write(file_, dataSize);
- }
- }
- else
- {
- /* skip over data portion */
- file_.seek(dataSize + file_.getFilePointer());
- }
-
- long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
- }
-
- return bytesRead;
- }
-
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in. Always use this method to query for application
- * specific data as it will have indexes.
-
- * @param key key we are interested in.
- * @param dos DataOutputStream that needs to be filled.
- * @param column name of the column in our format.
- * @param timeRange time range we are interested in.
- * @param section region of the file that needs to be read
- * @throws IOException
- * @return number of bytes that were read.
- */
- public long next(String key, DataOutputBuffer bufOut, String cf, IndexHelper.TimeRange timeRange, Coordinate section) throws IOException
- {
- String[] values = RowMutation.getColumnAndColumnFamily(cf);
- String columnFamilyName = values[0];
- String columnName = (values.length == 1) ? null : values[1];
-
- long bytesRead = -1L;
- if ( isEOF() )
- return bytesRead;
- seekTo(key, section);
- /* note the position where the key starts */
- long startPosition = file_.getFilePointer();
- String keyInDisk = readKeyFromDisk(file_);
- if ( keyInDisk != null )
- {
- /*
- * If key on disk is greater than requested key
- * we can bail out since we exploit the property
- * of the SSTable format.
- */
- if ( keyInDisk.compareTo(key) > 0 )
- return bytesRead;
-
- /*
- * If we found the key then we populate the buffer that
- * is passed in. If not then we skip over this key and
- * position ourselves to read the next one.
- */
- int dataSize = file_.readInt();
- if ( keyInDisk.equals(key) )
- {
- /* write the key into buffer */
- bufOut.writeUTF( keyInDisk );
-
- if(columnName == null)
- {
- int bytesSkipped = IndexHelper.skipBloomFilter(file_);
- /*
- * read the correct number of bytes for the column family and
- * write data into buffer. Substract from dataSize the bloom
- * filter size.
- */
- dataSize -= bytesSkipped;
- List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
- /* Read the times indexes if present */
- int totalBytesRead = handleColumnTimeIndexes(columnFamilyName, columnIndexList);
- dataSize -= totalBytesRead;
-
- /* read the column family name */
- String cfName = file_.readUTF();
- dataSize -= (utfPrefix_ + cfName.length());
-
- /* read if this cf is marked for delete */
- boolean markedForDelete = file_.readBoolean();
- dataSize -= 1;
-
- /* read the total number of columns */
- int totalNumCols = file_.readInt();
- dataSize -= 4;
-
- /* get the column range we have to read */
- IndexHelper.ColumnRange columnRange = IndexHelper.getColumnRangeFromTimeIndex(timeRange, columnIndexList, dataSize, totalNumCols);
-
- Coordinate coordinate = columnRange.coordinate();
- /* seek to the correct offset to the data, and calculate the data size */
- file_.skipBytes((int)coordinate.start_);
- dataSize = (int)(coordinate.end_ - coordinate.start_);
-
- /*
- * write the number of columns in the column family we are returning:
- * dataSize that we are reading +
- * length of column family name +
- * one booleanfor deleted or not +
- * one int for number of columns
- */
- bufOut.writeInt(dataSize + utfPrefix_+cfName.length() + 4 + 1);
- /* write the column family name */
- bufOut.writeUTF(cfName);
- /* write if this cf is marked for delete */
- bufOut.writeBoolean(markedForDelete);
- /* write number of columns */
- bufOut.writeInt(columnRange.count());
- /* now write the columns */
- bufOut.write(file_, dataSize);
- }
- }
- else
- {
- /* skip over data portion */
- file_.seek(dataSize + file_.getFilePointer());
- }
-
- long endPosition = file_.getFilePointer();
- bytesRead = endPosition - startPosition;
- }
-
- return bytesRead;
- }
-
- /**
- * This method dumps the next key/value into the DataOuputStream
- * passed in. Always use this method to query for application
- * specific data as it will have indexes.
- *
- * @param key key we are interested in.
- * @param dos DataOutputStream that needs to be filled.
* @param cfName The name of the column family only without the ":"
* @param columnNames The list of columns in the cfName column family that we want to return
* @param section region of the file that needs to be read
@@ -1358,7 +978,6 @@
}
private static Logger logger_ = Logger.getLogger( SequenceFile.class ) ;
- public static final short utfPrefix_ = 2;
public static final String marker_ = "Bloom-Filter";
public static IFileWriter writer(String filename) throws IOException
@@ -1376,11 +995,6 @@
return new ChecksumWriter(filename, size);
}
- public static IFileWriter concurrentWriter(String filename) throws IOException
- {
- return new ConcurrentWriter(filename);
- }
-
public static IFileWriter fastWriter(String filename, int size) throws IOException
{
return new FastConcurrentWriter(filename, size);
@@ -1401,11 +1015,6 @@
return new ChecksumReader(filename, size);
}
- public static boolean readBoolean(ByteBuffer buffer)
- {
- return ( buffer.get() == 1 ? true : false );
- }
-
/**
* Efficiently writes a UTF8 string to the buffer.
* Assuming all Strings that are passed in have length
@@ -1474,83 +1083,4 @@
buffer.put(bytearr, 0, utflen + 2);
}
- /**
- * Read a UTF8 string from a serialized buffer.
- * @param buffer buffer from which a UTF8 string is read
- * @return a Java String
- */
- protected static String readUTF(ByteBuffer in) throws IOException
- {
- int utflen = in.getShort();
- byte[] bytearr = new byte[utflen];
- char[] chararr = new char[utflen];
-
- int c, char2, char3;
- int count = 0;
- int chararr_count = 0;
-
- in.get(bytearr, 0, utflen);
-
- while (count < utflen)
- {
- c = (int) bytearr[count] & 0xff;
- if (c > 127)
- break;
- count++;
- chararr[chararr_count++] = (char) c;
- }
-
- while (count < utflen)
- {
- c = (int) bytearr[count] & 0xff;
- switch (c >> 4)
- {
- case 0:
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- case 6:
- case 7:
- /* 0xxxxxxx */
- count++;
- chararr[chararr_count++] = (char) c;
- break;
- case 12:
- case 13:
- /* 110x xxxx 10xx xxxx */
- count += 2;
- if (count > utflen)
- throw new UTFDataFormatException(
- "malformed input: partial character at end");
- char2 = (int) bytearr[count - 1];
- if ((char2 & 0xC0) != 0x80)
- throw new UTFDataFormatException(
- "malformed input around byte " + count);
- chararr[chararr_count++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
- break;
- case 14:
- /* 1110 xxxx 10xx xxxx 10xx xxxx */
- count += 3;
- if (count > utflen)
- throw new UTFDataFormatException(
- "malformed input: partial character at end");
- char2 = (int) bytearr[count - 2];
- char3 = (int) bytearr[count - 1];
- if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
- throw new UTFDataFormatException(
- "malformed input around byte " + (count - 1));
- chararr[chararr_count++] = (char) (((c & 0x0F) << 12)
- | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0));
- break;
- default:
- /* 10xx xxxx, 1111 xxxx */
- throw new UTFDataFormatException("malformed input around byte "
- + count);
- }
- }
- // The number of chars produced may be less than utflen
- return new String(chararr, 0, chararr_count);
- }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Fri Mar 27 02:44:28 2009
@@ -33,7 +33,6 @@
public interface IReplicaPlacementStrategy
{
public EndPoint[] getStorageEndPoints(BigInteger token);
- public Map<String, EndPoint[]> getStorageEndPoints(String[] keys);
public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Fri Mar 27 02:44:28 2009
@@ -4,7 +4,6 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -107,97 +106,7 @@
retrofitPorts(list);
return list.toArray(new EndPoint[0]);
}
-
- public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
- {
- Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
- List<EndPoint> list = new ArrayList<EndPoint>();
- int startIndex = 0 ;
- int foundCount = 0;
- boolean bDataCenter = false;
- boolean bOtherRack = false;
-
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- int N = DatabaseDescriptor.getReplicationFactor();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
- Collections.sort(tokens);
-
- for ( String key : keys )
- {
- BigInteger token = StorageService.hash(key);
- int index = Collections.binarySearch(tokens, token);
- if(index < 0)
- {
- index = (index + 1) * (-1);
- if (index >= tokens.size())
- index = 0;
- }
- int totalNodes = tokens.size();
- // Add the node at the index by default
- list.add(tokenToEndPointMap.get(tokens.get(index)));
- foundCount++;
- if( N == 1 )
- {
- results.put( key, list.toArray(new EndPoint[0]) );
- return results;
- }
- startIndex = (index + 1)%totalNodes;
- IEndPointSnitch endPointSnitch = StorageService.instance().getEndPointSnitch();
-
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
- {
- try
- {
- // First try to find one in a different data center
- if(!endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
- {
- // If we have already found something in a diff datacenter no need to find another
- if( !bDataCenter )
- {
- list.add(tokenToEndPointMap.get(tokens.get(i)));
- bDataCenter = true;
- foundCount++;
- }
- continue;
- }
- // Now try to find one on a different rack
- if(!endPointSnitch.isOnSameRack(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))) &&
- endPointSnitch.isInSameDataCenter(tokenToEndPointMap.get(tokens.get(index)), tokenToEndPointMap.get(tokens.get(i))))
- {
- // If we have already found something in a diff rack no need to find another
- if( !bOtherRack )
- {
- list.add(tokenToEndPointMap.get(tokens.get(i)));
- bOtherRack = true;
- foundCount++;
- }
- continue;
- }
- }
- catch (UnknownHostException e)
- {
- logger_.debug(LogUtil.throwableToString(e));
- }
- }
- // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
- // loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
- {
- if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
- {
- list.add(tokenToEndPointMap.get(tokens.get(i)));
- foundCount++;
- continue;
- }
- }
- retrofitPorts(list);
- results.put(key, list.toArray(new EndPoint[0]));
- }
-
- return results;
- }
-
public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
{
throw new UnsupportedOperationException("This operation is not currently supported");
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Fri Mar 27 02:44:28 2009
@@ -2,17 +2,13 @@
import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
/**
@@ -25,8 +21,7 @@
{
/* Use this flag to check if initialization is in order. */
private AtomicBoolean initialized_ = new AtomicBoolean(false);
- private Map<Range, List<EndPoint>> rangeToEndPointMap_;
-
+
public RackUnawareStrategy(TokenMetadata tokenMetadata)
{
super(tokenMetadata);
@@ -71,84 +66,5 @@
retrofitPorts(list);
return list.toArray(new EndPoint[0]);
}
-
- private void doInitialization()
- {
- if ( !initialized_.get() )
- {
- /* construct the mapping from the ranges to the replicas responsible for them */
- rangeToEndPointMap_ = StorageService.instance().getRangeToEndPointMap();
- initialized_.set(true);
- }
- }
-
- /**
- * This method determines which range in the array actually contains
- * the hash of the key
- * @param ranges
- * @param key
- * @return
- */
- private int findRangeIndexForKey(Range[] ranges, String key)
- {
- int index = 0;
- BigInteger hash = StorageService.hash(key);
- for ( int i = 0; i < ranges.length; ++i )
- {
- if ( ranges[i].contains(hash) )
- {
- index = i;
- break;
- }
- }
-
- return index;
- }
-
- public Map<String, EndPoint[]> getStorageEndPoints(String[] keys)
- {
- Arrays.sort(keys);
- Range[] ranges = StorageService.instance().getAllRanges();
-
- Map<String, EndPoint[]> results = new HashMap<String, EndPoint[]>();
- List<EndPoint> list = new ArrayList<EndPoint>();
- int startIndex = 0 ;
- int foundCount = 0;
-
- Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
- int N = DatabaseDescriptor.getReplicationFactor();
- List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
- Collections.sort(tokens);
- for ( String key : keys )
- {
- BigInteger token = StorageService.hash(key);
- int index = Collections.binarySearch(tokens, token);
- if(index < 0)
- {
- index = (index + 1) * (-1);
- if (index >= tokens.size())
- index = 0;
- }
- int totalNodes = tokens.size();
- // Add the node at the index by default
- list.add(tokenToEndPointMap.get(tokens.get(index)));
- foundCount++;
- startIndex = (index + 1)%totalNodes;
- // If we found N number of nodes we are good. This loop will just exit. Otherwise just
- // loop through the list and add until we have N nodes.
- for (int i = startIndex, count = 1; count < totalNodes && foundCount < N; ++count, i = (i+1)%totalNodes)
- {
- if( ! list.contains(tokenToEndPointMap.get(tokens.get(i))))
- {
- list.add(tokenToEndPointMap.get(tokens.get(i)));
- foundCount++;
- continue;
- }
- }
- retrofitPorts(list);
- results.put(key, list.toArray(new EndPoint[0]));
- }
-
- return results;
- }
+
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=758998&r1=758997&r2=758998&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Fri Mar 27 02:44:28 2009
@@ -39,29 +39,19 @@
public class TokenMetadata
{
- private static ICompactSerializer<TokenMetadata> serializer_ = new TokenMetadataSerializer();
-
- public static ICompactSerializer<TokenMetadata> serializer()
- {
- return serializer_;
- }
-
- /* Maintains token to endpoint map of every node in the cluster. */
+ /* Maintains token to endpoint map of every node in the cluster. */
private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();
/* Maintains a reverse index of endpoint to token in the cluster. */
private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
/* Use this lock for manipulating the token map */
- private ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
-
- /*
- * For JAXB purposes.
- */
+ private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+
public TokenMetadata()
{
}
-
- protected TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+
+ private TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
{
tokenToEndPointMap_ = tokenToEndPointMap;
endPointToTokenMap_ = endPointToTokenMap;
@@ -188,51 +178,3 @@
return sb.toString();
}
}
-
-class TokenMetadataSerializer implements ICompactSerializer<TokenMetadata>
-{
- public void serialize(TokenMetadata tkMetadata, DataOutputStream dos) throws IOException
- {
- Map<BigInteger, EndPoint> tokenToEndPointMap = tkMetadata.cloneTokenEndPointMap();
- Set<BigInteger> tokens = tokenToEndPointMap.keySet();
- /* write the size */
- dos.writeInt(tokens.size());
- for ( BigInteger token : tokens )
- {
- byte[] bytes = token.toByteArray();
- /* Convert the BigInteger to byte[] and persist */
- dos.writeInt(bytes.length);
- dos.write(bytes);
- /* Write the endpoint out */
- CompactEndPointSerializationHelper.serialize(tokenToEndPointMap.get(token), dos);
- }
- }
-
- public TokenMetadata deserialize(DataInputStream dis) throws IOException
- {
- TokenMetadata tkMetadata = null;
- int size = dis.readInt();
-
- if ( size > 0 )
- {
- Map<BigInteger, EndPoint> tokenToEndPointMap = new HashMap<BigInteger, EndPoint>();
- Map<EndPoint, BigInteger> endPointToTokenMap = new HashMap<EndPoint, BigInteger>();
-
- for ( int i = 0; i < size; ++i )
- {
- /* Read the byte[] and convert to BigInteger */
- byte[] bytes = new byte[dis.readInt()];
- dis.readFully(bytes);
- BigInteger token = new BigInteger(bytes);
- /* Read the endpoint out */
- EndPoint endpoint = CompactEndPointSerializationHelper.deserialize(dis);
- tokenToEndPointMap.put(token, endpoint);
- endPointToTokenMap.put(endpoint, token);
- }
-
- tkMetadata = new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
- }
-
- return tkMetadata;
- }
-}