You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 01:52:11 UTC
svn commit: r773728 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamilyStore.java io/SSTable.java
Author: jbellis
Date: Mon May 11 23:52:10 2009
New Revision: 773728
URL: http://svn.apache.org/viewvc?rev=773728&view=rev
Log:
reformat. this is just whitespace changes, automated by the IDE. patch by jbellis
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773728&r1=773727&r2=773728&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 23:52:10 2009
@@ -64,14 +64,14 @@
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
- private static final int BUFSIZE = 128*1024*1024;
+ private static final int BUFSIZE = 128 * 1024 * 1024;
private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
private final String table_;
public final String columnFamily_;
private final boolean isSuper_;
-
+
private volatile Integer memtableSwitchCount = 0;
/* This is used to generate the next index for a SSTable */
@@ -113,7 +113,7 @@
*/
List<Integer> indices = new ArrayList<Integer>();
String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
- for ( String directory : dataFileDirectories )
+ for (String directory : dataFileDirectories)
{
File fileDir = new File(directory);
File[] files = fileDir.listFiles();
@@ -123,7 +123,7 @@
String[] tblCfName = getTableAndColumnFamilyName(filename);
if (tblCfName[0].equals(table)
- && tblCfName[1].equals(columnFamily))
+ && tblCfName[1].equals(columnFamily))
{
int index = getIndexFromFileName(filename);
indices.add(index);
@@ -154,23 +154,23 @@
/* Do major compaction */
List<File> ssTables = new ArrayList<File>();
String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
- for ( String directory : dataFileDirectories )
+ for (String directory : dataFileDirectories)
{
File fileDir = new File(directory);
File[] files = fileDir.listFiles();
for (File file : files)
{
String filename = file.getName();
- if(((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_)) ) && (filename.contains(columnFamily_)))
+ if (((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_))) && (filename.contains(columnFamily_)))
{
- file.delete();
- continue;
+ file.delete();
+ continue;
}
-
+
String[] tblCfName = getTableAndColumnFamilyName(filename);
if (tblCfName[0].equals(table_)
- && tblCfName[1].equals(columnFamily_)
- && filename.contains("-Data.db"))
+ && tblCfName[1].equals(columnFamily_)
+ && filename.contains("-Data.db"))
{
ssTables.add(file.getAbsoluteFile());
}
@@ -189,17 +189,19 @@
SSTable.onStart(filenames);
logger_.debug("Submitting a major compaction task ...");
MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
- if(columnFamily_.equals(Table.hints_))
+ if (columnFamily_.equals(Table.hints_))
{
- HintedHandOffManager.instance().submit(this);
+ HintedHandOffManager.instance().submit(this);
}
// TODO this seems unnecessary -- each memtable flush checks to see if it needs to compact, too
MinorCompactionManager.instance().submitPeriodicCompaction(this);
-
+
/* submit periodic flusher if required */
int flushPeriod = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_);
if (flushPeriod > 0)
+ {
PeriodicFlushManager.instance().submitPeriodicFlusher(this, flushPeriod);
+ }
}
List<String> getAllSSTablesOnDisk()
@@ -222,7 +224,7 @@
* no files on disk we do not want to display
* something ugly on the admin page.
*/
- if ( ssTables_.size() == 0 )
+ if (ssTables_.size() == 0)
{
return sb.toString();
}
@@ -231,7 +233,7 @@
sb.append("Number of files on disk : " + ssTables_.size());
sb.append(newLineSeparator);
double totalSpace = 0d;
- for ( String file : ssTables_ )
+ for (String file : ssTables_)
{
File f = new File(file);
totalSpace += f.length();
@@ -250,14 +252,14 @@
*/
void addToList(String file)
{
- lock_.writeLock().lock();
+ lock_.writeLock().lock();
try
{
ssTables_.add(file);
}
finally
{
- lock_.writeLock().unlock();
+ lock_.writeLock().unlock();
}
}
@@ -275,8 +277,10 @@
* is present in the BloomFilter. If not continue to the next file.
*/
boolean bVal = SSTable.isKeyInFile(key, file);
- if ( !bVal )
+ if (!bVal)
+ {
continue;
+ }
SSTable ssTable = new SSTable(file, StorageService.getPartitioner());
ssTable.touch(key, fData);
}
@@ -292,26 +296,32 @@
* for the process to complete by waiting on a future pointer.
*/
boolean forceCompaction(List<Range> ranges, EndPoint target, long skip, List<String> fileList)
- {
- Future<Boolean> futurePtr = null;
- if( ranges != null)
- futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
- else
- MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
-
+ {
+ Future<Boolean> futurePtr = null;
+ if (ranges != null)
+ {
+ futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
+ }
+ else
+ {
+ MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
+ }
+
boolean result = true;
try
{
/* Waiting for the compaction to complete. */
- if(futurePtr != null)
- result = futurePtr.get();
+ if (futurePtr != null)
+ {
+ result = futurePtr.get();
+ }
logger_.debug("Done forcing compaction ...");
}
catch (ExecutionException ex)
{
logger_.debug(LogUtil.throwableToString(ex));
}
- catch ( InterruptedException ex2 )
+ catch (InterruptedException ex2)
{
logger_.debug(LogUtil.throwableToString(ex2));
}
@@ -331,7 +341,9 @@
while (st.hasMoreElements())
{
if (i == 0)
+ {
values[i] = (String) st.nextElement();
+ }
else if (i == 1)
{
values[i] = (String) st.nextElement();
@@ -360,7 +372,9 @@
{
index = (String) st.nextElement();
if (i == (count - 2))
+ {
break;
+ }
++i;
}
return Integer.parseInt(index);
@@ -368,8 +382,8 @@
String getNextFileName()
{
- // Psuedo increment so that we do not generate consecutive numbers
- fileIndexGenerator_.incrementAndGet();
+ // Psuedo increment so that we do not generate consecutive numbers
+ fileIndexGenerator_.incrementAndGet();
return table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
}
@@ -378,8 +392,8 @@
*/
String getTempFileName()
{
- // Psuedo increment so that we do not generate consecutive numbers
- fileIndexGenerator_.incrementAndGet();
+ // Psuedo increment so that we do not generate consecutive numbers
+ fileIndexGenerator_.incrementAndGet();
return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet();
}
@@ -390,32 +404,34 @@
* Since we do not generate consecutive numbers hence the lowest file number
* can just be incremented to generate the next file.
*/
- String getTempFileName( List<String> files)
+ String getTempFileName(List<String> files)
{
- int lowestIndex;
- int index;
- Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
-
- if( files.size() <= 1)
- return null;
- lowestIndex = getIndexFromFileName(files.get(0));
-
- index = lowestIndex + 1 ;
+ int lowestIndex;
+ int index;
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+
+ if (files.size() <= 1)
+ {
+ return null;
+ }
+ lowestIndex = getIndexFromFileName(files.get(0));
+
+ index = lowestIndex + 1;
return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index;
}
-
+
/*
- * This version is used only on start up when we are recovering from logs.
- * In the future we may want to parellelize the log processing for a table
- * by having a thread per log file present for recovery. Re-visit at that
- * time.
- */
+ * This version is used only on start up when we are recovering from logs.
+ * In the future we may want to parellelize the log processing for a table
+ * by having a thread per log file present for recovery. Re-visit at that
+ * time.
+ */
void switchMemtable()
{
memtable_.set(new Memtable(table_, columnFamily_));
-
+
if (memtableSwitchCount == Integer.MAX_VALUE)
{
memtableSwitchCount = 0;
@@ -431,7 +447,7 @@
*/
void switchBinaryMemtable(String key, byte[] buffer) throws IOException
{
- binaryMemtable_.set( new BinaryMemtable(table_, columnFamily_) );
+ binaryMemtable_.set(new BinaryMemtable(table_, columnFamily_));
binaryMemtable_.get().put(key, buffer);
}
@@ -463,11 +479,11 @@
}
/**
- * Insert/Update the column family for this key.
- * param @ lock - lock that needs to be used.
- * param @ key - key for update/insert
+ * Insert/Update the column family for this key.
+ * param @ lock - lock that needs to be used.
+ * param @ key - key for update/insert
* param @ columnFamily - columnFamily changes
- */
+ */
void apply(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx)
throws IOException
{
@@ -504,7 +520,6 @@
}
/**
- *
* Get the column family in the most efficient order.
* 1. Memtable
* 2. Sorted list of files
@@ -531,6 +546,7 @@
/**
* Fetch from disk files and go in sorted order to be efficient
* This fn exits as soon as the required data is found.
+ *
* @param key
* @param cf
* @param columnFamilies
@@ -540,8 +556,8 @@
private void getColumnFamilyFromDisk(String key, String cf, List<ColumnFamily> columnFamilies, IFilter filter) throws IOException
{
/* Scan the SSTables on disk first */
- List<String> files = new ArrayList<String>();
- lock_.readLock().lock();
+ List<String> files = new ArrayList<String>();
+ lock_.readLock().lock();
try
{
files.addAll(ssTables_);
@@ -551,7 +567,7 @@
{
lock_.readLock().unlock();
}
-
+
for (String file : files)
{
/*
@@ -559,15 +575,17 @@
* is present in the BloomFilter. If not continue to the next file.
*/
boolean bVal = SSTable.isKeyInFile(key, file);
- if ( !bVal )
+ if (!bVal)
+ {
continue;
+ }
ColumnFamily columnFamily = fetchColumnFamily(key, cf, filter, file);
if (columnFamily != null)
{
columnFamilies.add(columnFamily);
- if(filter.isDone())
+ if (filter.isDone())
{
- break;
+ break;
}
}
}
@@ -575,17 +593,21 @@
private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
- {
- SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
- DataInputBuffer bufIn;
- bufIn = filter.next(key, cf, ssTable);
- if (bufIn.getLength() == 0)
- return null;
+ {
+ SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
+ DataInputBuffer bufIn;
+ bufIn = filter.next(key, cf, ssTable);
+ if (bufIn.getLength() == 0)
+ {
+ return null;
+ }
ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
- if (columnFamily == null)
- return null;
- return columnFamily;
- }
+ if (columnFamily == null)
+ {
+ return null;
+ }
+ return columnFamily;
+ }
private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
{
@@ -597,7 +619,9 @@
}
}
- /** like resolve, but leaves the resolved CF as the only item in the list */
+ /**
+ * like resolve, but leaves the resolved CF as the only item in the list
+ */
private static void merge(List<ColumnFamily> columnFamilies)
{
ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
@@ -605,7 +629,8 @@
columnFamilies.add(cf);
}
- private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies) {
+ private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies)
+ {
ColumnFamily cf = ColumnFamily.resolve(columnFamilies);
return removeDeleted(cf);
}
@@ -618,13 +643,15 @@
*/
static ColumnFamily removeDeleted(ColumnFamily cf)
{
- return removeDeleted(cf, (int)(System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds());
+ return removeDeleted(cf, (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds());
}
static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
{
if (cf == null)
+ {
return null;
+ }
// in case of a timestamp tie, tombstones get priority over non-tombstones.
// we want this to be deterministic in general to avoid confusion;
@@ -679,7 +706,7 @@
*/
void applyNow(String key, ColumnFamily columnFamily) throws IOException
{
- memtable_.get().putOnRecovery(key, columnFamily);
+ memtable_.get().putOnRecovery(key, columnFamily);
}
/*
@@ -689,8 +716,10 @@
*/
void onMemtableFlush(CommitLog.CommitLogContext cLogCtx) throws IOException
{
- if ( cLogCtx.isValidContext() )
+ if (cLogCtx.isValidContext())
+ {
CommitLog.open(table_).onMemtableFlush(columnFamily_, cLogCtx);
+ }
}
/*
@@ -706,7 +735,7 @@
void storeLocation(String filename, BloomFilter bf)
{
int ssTableSize = 0;
- lock_.writeLock().lock();
+ lock_.writeLock().lock();
try
{
ssTables_.add(filename);
@@ -715,7 +744,7 @@
}
finally
{
- lock_.writeLock().unlock();
+ lock_.writeLock().unlock();
}
if ((ssTableSize >= COMPACTION_THRESHOLD && !isCompacting_.get())
@@ -730,34 +759,36 @@
PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
{
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
- if (files.size() > 1 || (ranges != null && files.size() > 0))
+ if (files.size() > 1 || (ranges != null && files.size() > 0))
{
- int bufferSize = Math.min( (ColumnFamilyStore.COMPACTION_MEMORY_THRESHOLD / files.size()), minBufferSize ) ;
+ int bufferSize = Math.min((ColumnFamilyStore.COMPACTION_MEMORY_THRESHOLD / files.size()), minBufferSize);
FileStruct fs = null;
for (String file : files)
{
- try
- {
- fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize), StorageService.getPartitioner());
- fs.advance();
- if(fs.isExhausted())
- continue;
- pq.add(fs);
- }
- catch ( Exception ex)
- {
+ try
+ {
+ fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize), StorageService.getPartitioner());
+ fs.advance();
+ if (fs.isExhausted())
+ {
+ continue;
+ }
+ pq.add(fs);
+ }
+ catch (Exception ex)
+ {
logger_.warn("corrupt file? or are you just blowing away data files manually out from under me?", ex);
- try
- {
- if (fs != null)
- {
- fs.close();
- }
- }
- catch(Exception e)
- {
- logger_.warn("Unable to close file :" + file);
- }
+ try
+ {
+ if (fs != null)
+ {
+ fs.close();
+ }
+ }
+ catch (Exception e)
+ {
+ logger_.warn("Unable to close file :" + file);
+ }
}
}
}
@@ -769,39 +800,39 @@
*/
static Set<List<String>> getCompactionBuckets(List<String> files, long min)
{
- Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
- for(String fname : files)
- {
- File f = new File(fname);
- long size = f.length();
+ Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
+ for (String fname : files)
+ {
+ File f = new File(fname);
+ long size = f.length();
- boolean bFound = false;
+ boolean bFound = false;
// look for a bucket containing similar-sized files:
// group in the same bucket if it's w/in 50% of the average for this bucket,
// or this file and the bucket are all considered "small" (less than `min`)
for (List<String> bucket : buckets.keySet())
- {
+ {
long averageSize = buckets.get(bucket);
- if ((size > averageSize/2 && size < 3*averageSize/2)
- || ( size < min && averageSize < min))
- {
+ if ((size > averageSize / 2 && size < 3 * averageSize / 2)
+ || (size < min && averageSize < min))
+ {
// remove and re-add because adding changes the hash
buckets.remove(bucket);
- averageSize = (averageSize + size) / 2 ;
+ averageSize = (averageSize + size) / 2;
bucket.add(fname);
buckets.put(bucket, averageSize);
- bFound = true;
- break;
- }
- }
+ bFound = true;
+ break;
+ }
+ }
// no similar bucket found; put it in a new one
- if(!bFound)
- {
+ if (!bFound)
+ {
ArrayList<String> bucket = new ArrayList<String>();
bucket.add(fname);
buckets.put(bucket, size);
- }
- }
+ }
+ }
return buckets.keySet();
}
@@ -854,7 +885,7 @@
void doMajorCompaction(long skip)
{
- doMajorCompactionInternal( skip );
+ doMajorCompactionInternal(skip);
}
/*
@@ -870,23 +901,23 @@
List<String> files;
try
{
- if( skip > 0L )
- {
- files = new ArrayList<String>();
- for ( String file : filesInternal )
- {
- File f = new File(file);
- if( f.length() < skip*1024L*1024L*1024L )
- {
- files.add(file);
- }
- }
- }
- else
- {
- files = filesInternal;
- }
- doFileCompaction(files, BUFSIZE);
+ if (skip > 0L)
+ {
+ files = new ArrayList<String>();
+ for (String file : filesInternal)
+ {
+ File f = new File(file);
+ if (f.length() < skip * 1024L * 1024L * 1024L)
+ {
+ files.add(file);
+ }
+ }
+ }
+ else
+ {
+ files = filesInternal;
+ }
+ doFileCompaction(files, BUFSIZE);
}
catch (IOException ex)
{
@@ -894,7 +925,7 @@
}
finally
{
- isCompacting_.set(false);
+ isCompacting_.set(false);
}
}
@@ -904,33 +935,33 @@
*/
long getExpectedCompactedFileSize(List<String> files)
{
- long expectedFileSize = 0;
- for(String file : files)
- {
- File f = new File(file);
- long size = f.length();
- expectedFileSize = expectedFileSize + size;
- }
- return expectedFileSize;
+ long expectedFileSize = 0;
+ for (String file : files)
+ {
+ File f = new File(file);
+ long size = f.length();
+ expectedFileSize = expectedFileSize + size;
+ }
+ return expectedFileSize;
}
/*
* Find the maximum size file in the list .
*/
- String getMaxSizeFile( List<String> files )
+ String getMaxSizeFile(List<String> files)
{
- long maxSize = 0L;
- String maxFile = null;
- for ( String file : files )
- {
- File f = new File(file);
- if(f.length() > maxSize )
- {
- maxSize = f.length();
- maxFile = file;
- }
- }
- return maxFile;
+ long maxSize = 0L;
+ String maxFile = null;
+ for (String file : files)
+ {
+ File f = new File(file);
+ if (f.length() > maxSize)
+ {
+ maxSize = f.length();
+ maxFile = file;
+ }
+ }
+ return maxFile;
}
boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
@@ -940,11 +971,11 @@
boolean result = true;
try
{
- result = doFileAntiCompaction(files, ranges, target, fileList, null);
+ result = doFileAntiCompaction(files, ranges, target, fileList, null);
}
finally
{
- isCompacting_.set(false);
+ isCompacting_.set(false);
}
return result;
@@ -952,12 +983,13 @@
void forceCleanup()
{
- MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
+ MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
}
-
+
/**
- * This function goes over each file and removes the keys that the node is not responsible for
+ * This function goes over each file and removes the keys that the node is not responsible for
* and only keeps keys that this node is responsible for.
+ *
* @throws IOException
*/
void doCleanupCompaction()
@@ -966,33 +998,37 @@
List<String> files = new ArrayList<String>(ssTables_);
try
{
- for(String file: files)
+ for (String file : files)
{
doCleanup(file);
}
}
finally
{
- isCompacting_.set(false);
+ isCompacting_.set(false);
}
}
+
/**
* cleans up one particular file by removing keys that this node is not responsible for.
+ *
* @param file
* @throws IOException
*/
/* TODO: Take care of the comments later. */
void doCleanup(String file)
{
- if(file == null )
- return;
+ if (file == null)
+ {
+ return;
+ }
List<Range> myRanges;
- List<String> files = new ArrayList<String>();
- files.add(file);
- List<String> newFiles = new ArrayList<String>();
- Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
- myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
- List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
+ List<String> files = new ArrayList<String>();
+ files.add(file);
+ List<String> newFiles = new ArrayList<String>();
+ Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
+ myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
+ List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
doFileAntiCompaction(files, myRanges, null, newFiles, compactedBloomFilters);
logger_.debug("Original file : " + file + " of size " + new File(file).length());
lock_.writeLock().lock();
@@ -1001,9 +1037,9 @@
ssTables_.remove(file);
SSTable.removeAssociatedBloomFilter(file);
for (String newfile : newFiles)
- {
+ {
logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
- if ( newfile != null )
+ if (newfile != null)
{
ssTables_.add(newfile);
logger_.debug("Inserting bloom filter for file " + newfile);
@@ -1017,10 +1053,11 @@
lock_.writeLock().unlock();
}
}
-
+
/**
* This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range
* If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out.
+ *
* @param files
* @param ranges
* @param target
@@ -1030,7 +1067,7 @@
*/
boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
{
- boolean result = false;
+ boolean result = false;
long startTime = System.currentTimeMillis();
long totalBytesRead = 0;
long totalBytesWritten = 0;
@@ -1041,185 +1078,191 @@
IPartitioner p = StorageService.getPartitioner();
try
{
- // Calculate the expected compacted filesize
- long expectedRangeFileSize = getExpectedCompactedFileSize(files);
- /* in the worst case a node will be giving out alf of its data so we take a chance */
- expectedRangeFileSize = expectedRangeFileSize / 2;
- rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
- // If the compaction file path is null that means we have no space left for this compaction.
- if( rangeFileLocation == null )
- {
- logger_.warn("Total bytes to be written for range compaction ..."
- + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
- return result;
- }
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
- if (pq.size() > 0)
- {
- mergedFileName = getTempFileName();
- SSTable ssTableRange = null ;
- String lastkey = null;
- List<FileStruct> lfs = new ArrayList<FileStruct>();
- DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
- expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
- logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
- /* Create the bloom filter for the compacted file. */
- BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
- List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-
- while (pq.size() > 0 || lfs.size() > 0)
- {
- FileStruct fs = null;
- if (pq.size() > 0)
- {
- fs = pq.poll();
- }
- if (fs != null
- && (lastkey == null || lastkey.equals(fs.getKey())))
- {
- // The keys are the same so we need to add this to the
- // ldfs list
- lastkey = fs.getKey();
- lfs.add(fs);
- }
- else
- {
- Collections.sort(lfs, new FileStructComparator());
- ColumnFamily columnFamily;
- bufOut.reset();
- if(lfs.size() > 1)
- {
- for (FileStruct filestruct : lfs)
- {
- try
- {
- /* read the length although we don't need it */
- filestruct.getBufIn().readInt();
- // Skip the Index
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+ /* in the worst case a node will be giving out alf of its data so we take a chance */
+ expectedRangeFileSize = expectedRangeFileSize / 2;
+ rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
+ // If the compaction file path is null that means we have no space left for this compaction.
+ if (rangeFileLocation == null)
+ {
+ logger_.warn("Total bytes to be written for range compaction ..."
+ + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
+ return result;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
+ if (pq.size() > 0)
+ {
+ mergedFileName = getTempFileName();
+ SSTable ssTableRange = null;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.equals(fs.getKey())))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.getKey();
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily;
+ bufOut.reset();
+ if (lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ /* read the length although we don't need it */
+ filestruct.getBufIn().readInt();
+ // Skip the Index
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
- // We want to add only 2 and resolve them right there in order to save on memory footprint
- if(columnFamilies.size() > 1)
- {
- // Now merge the 2 column families
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if (columnFamilies.size() > 1)
+ {
+ // Now merge the 2 column families
merge(columnFamilies);
- }
- // deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
- }
- catch ( Exception ex)
- {
+ }
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+ }
+ catch (Exception ex)
+ {
logger_.warn(LogUtil.throwableToString(ex));
}
- }
- // Now after merging all crap append to the sstable
- columnFamily = resolveAndRemoveDeleted(columnFamilies);
- columnFamilies.clear();
- if( columnFamily != null )
- {
- /* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
- }
- }
- else
- {
- FileStruct filestruct = lfs.get(0);
- try
- {
- /* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
- }
- catch ( Exception ex)
- {
- logger_.warn(LogUtil.throwableToString(ex));
- filestruct.close();
- continue;
- }
- }
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolveAndRemoveDeleted(columnFamilies);
+ columnFamilies.clear();
+ if (columnFamily != null)
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ try
+ {
+ /* read the length although we don't need it */
+ int size = filestruct.getBufIn().readInt();
+ bufOut.write(filestruct.getBufIn(), size);
+ }
+ catch (Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ filestruct.close();
+ continue;
+ }
+ }
if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
- {
- if(ssTableRange == null )
- {
- if( target != null )
- rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
- FileUtils.createDirectory(rangeFileLocation);
- ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
- }
- try
- {
- ssTableRange.append(lastkey, bufOut);
- compactedRangeBloomFilter.add(lastkey);
- }
- catch(Exception ex)
- {
- logger_.warn( LogUtil.throwableToString(ex) );
- }
- }
- totalkeysWritten++;
- for (FileStruct filestruct : lfs)
- {
- try
- {
+ {
+ if (ssTableRange == null)
+ {
+ if (target != null)
+ {
+ rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
+ }
+ FileUtils.createDirectory(rangeFileLocation);
+ ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
+ }
+ try
+ {
+ ssTableRange.append(lastkey, bufOut);
+ compactedRangeBloomFilter.add(lastkey);
+ }
+ catch (Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
filestruct.advance();
- if (filestruct.isExhausted())
- {
- continue;
- }
- /* keep on looping until we find a key in the range */
+ if (filestruct.isExhausted())
+ {
+ continue;
+ }
+ /* keep on looping until we find a key in the range */
while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
- {
+ {
filestruct.advance();
if (filestruct.isExhausted())
- {
- break;
- }
- }
- if (!filestruct.isExhausted())
- {
- pq.add(filestruct);
- }
- totalkeysRead++;
- }
- catch ( Exception ex )
- {
- // Ignore the exception as it might be a corrupted file
- // in any case we have read as far as possible from it
- // and it will be deleted after compaction.
+ {
+ break;
+ }
+ }
+ if (!filestruct.isExhausted())
+ {
+ pq.add(filestruct);
+ }
+ totalkeysRead++;
+ }
+ catch (Exception ex)
+ {
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
logger_.warn(LogUtil.throwableToString(ex));
- filestruct.close();
+ filestruct.close();
}
- }
- lfs.clear();
- lastkey = null;
- if (fs != null)
- {
- // Add back the fs since we processed the rest of
- // filestructs
- pq.add(fs);
- }
- }
- }
+ }
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ // Add back the fs since we processed the rest of
+ // filestructs
+ pq.add(fs);
+ }
+ }
+ }
- if( ssTableRange != null )
- {
+ if (ssTableRange != null)
+ {
ssTableRange.closeRename(compactedRangeBloomFilter);
if (fileList != null)
+ {
fileList.add(ssTableRange.getDataFileLocation());
+ }
if (compactedBloomFilters != null)
- compactedBloomFilters.add(compactedRangeBloomFilter);
- }
- }
+ {
+ compactedBloomFilters.add(compactedRangeBloomFilter);
+ }
+ }
+ }
}
- catch ( Exception ex)
+ catch (Exception ex)
{
- logger_.error( LogUtil.throwableToString(ex) );
+ logger_.error(LogUtil.throwableToString(ex));
}
logger_.debug("Total time taken for range split ..."
- + (System.currentTimeMillis() - startTime));
+ + (System.currentTimeMillis() - startTime));
logger_.debug("Total bytes Read for range split ..." + totalBytesRead);
logger_.debug("Total bytes written for range split ..."
- + totalBytesWritten + " Total keys read ..." + totalkeysRead);
+ + totalBytesWritten + " Total keys read ..." + totalkeysRead);
return result;
}
@@ -1227,28 +1270,28 @@
{
bf.add(StorageService.getPartitioner().undecorateKey(decoratedKey));
}
-
+
/*
- * This function does the actual compaction for files.
- * It maintains a priority queue of with the first key from each file
- * and then removes the top of the queue and adds it to the SStable and
- * repeats this process while reading the next from each file until its
- * done with all the files . The SStable to which the keys are written
- * represents the new compacted file. Before writing if there are keys
- * that occur in multiple files and are the same then a resolution is done
- * to get the latest data.
- *
- */
- private int doFileCompaction(List<String> files, int minBufferSize) throws IOException
+ * This function does the actual compaction for files.
+ * It maintains a priority queue of with the first key from each file
+ * and then removes the top of the queue and adds it to the SStable and
+ * repeats this process while reading the next from each file until its
+ * done with all the files . The SStable to which the keys are written
+ * represents the new compacted file. Before writing if there are keys
+ * that occur in multiple files and are the same then a resolution is done
+ * to get the latest data.
+ *
+ */
+ private int doFileCompaction(List<String> files, int minBufferSize) throws IOException
{
String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
// If the compaction file path is null that means we have no space left for this compaction.
// try again w/o the largest one.
- if( compactionFileLocation == null )
+ if (compactionFileLocation == null)
{
- String maxFile = getMaxSizeFile( files );
- files.remove( maxFile );
- return doFileCompaction(files , minBufferSize);
+ String maxFile = getMaxSizeFile(files);
+ files.remove(maxFile);
+ return doFileCompaction(files, minBufferSize);
}
String newfile = null;
@@ -1261,7 +1304,7 @@
if (pq.size() > 0)
{
- String mergedFileName = getTempFileName( files );
+ String mergedFileName = getTempFileName(files);
SSTable ssTable = null;
String lastkey = null;
List<FileStruct> lfs = new ArrayList<FileStruct>();
@@ -1281,7 +1324,7 @@
fs = pq.poll();
}
if (fs != null
- && (lastkey == null || lastkey.equals(fs.getKey())))
+ && (lastkey == null || lastkey.equals(fs.getKey())))
{
// The keys are the same so we need to add this to the
// ldfs list
@@ -1293,7 +1336,7 @@
Collections.sort(lfs, new FileStructComparator());
ColumnFamily columnFamily;
bufOut.reset();
- if(lfs.size() > 1)
+ if (lfs.size() > 1)
{
for (FileStruct filestruct : lfs)
{
@@ -1304,14 +1347,14 @@
// Skip the Index
IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
// We want to add only 2 and resolve them right there in order to save on memory footprint
- if(columnFamilies.size() > 1)
+ if (columnFamilies.size() > 1)
{
merge(columnFamilies);
}
// deserialize into column families
columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
}
- catch ( Exception ex)
+ catch (Exception ex)
{
logger_.warn("error in filecompaction", ex);
}
@@ -1319,7 +1362,7 @@
// Now after merging all crap append to the sstable
columnFamily = resolveAndRemoveDeleted(columnFamilies);
columnFamilies.clear();
- if( columnFamily != null )
+ if (columnFamily != null)
{
/* serialize the cf with column indexes */
ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
@@ -1334,7 +1377,7 @@
int size = filestruct.getBufIn().readInt();
bufOut.write(filestruct.getBufIn(), size);
}
- catch ( Exception ex)
+ catch (Exception ex)
{
logger_.error("empty sstable file " + filestruct.getFileName(), ex);
filestruct.close();
@@ -1342,7 +1385,7 @@
}
}
- if ( ssTable == null )
+ if (ssTable == null)
{
ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
}
@@ -1363,7 +1406,7 @@
pq.add(filestruct);
totalkeysRead++;
}
- catch ( Throwable ex )
+ catch (Throwable ex)
{
// Ignore the exception as it might be a corrupted file
// in any case we have read as far as possible from it
@@ -1380,7 +1423,7 @@
}
}
}
- if ( ssTable != null )
+ if (ssTable != null)
{
ssTable.closeRename(compactedBloomFilter);
newfile = ssTable.getDataFileLocation();
@@ -1393,7 +1436,7 @@
ssTables_.remove(file);
SSTable.removeAssociatedBloomFilter(file);
}
- if ( newfile != null )
+ if (newfile != null)
{
logger_.debug("Inserting bloom filter for file " + newfile);
SSTable.storeBloomFilter(newfile, compactedBloomFilter);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=773728&r1=773727&r2=773728&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon May 11 23:52:10 2009
@@ -48,34 +48,34 @@
* data on disk in sorted fashion. However the sorting is upto
* the application. This class expects keys to be handed to it
* in sorted order. SSTable is broken up into blocks where each
- * block contains 128 keys. At the end of the file the block
+ * block contains 128 keys. At the end of the file the block
* index is written which contains the offsets to the keys in the
- * block. SSTable also maintains an index file to which every 128th
- * key is written with a pointer to the block index which is the block
- * that actually contains the key. This index file is then read and
+ * block. SSTable also maintains an index file to which every 128th
+ * key is written with a pointer to the block index which is the block
+ * that actually contains the key. This index file is then read and
* maintained in memory. SSTable is append only and immutable. SSTable
* on disk looks as follows:
- *
- * -------------------------
- * |------------------------|<-------|
- * | | | BLOCK-INDEX PTR
- * | | |
- * |------------------------|--------
- * |------------------------|<-------|
- * | | |
- * | | | BLOCK-INDEX PTR
- * | | |
- * |------------------------|---------
- * |------------------------|<--------|
- * | | |
- * | | |
- * | | | BLOCK-INDEX PTR
- * | | |
- * |------------------------| |
- * |------------------------|----------
- * |------------------------|-----------------> BLOOM-FILTER
+ * <p/>
+ * -------------------------
+ * |------------------------|<-------|
+ * | | | BLOCK-INDEX PTR
+ * | | |
+ * |------------------------|--------
+ * |------------------------|<-------|
+ * | | |
+ * | | | BLOCK-INDEX PTR
+ * | | |
+ * |------------------------|---------
+ * |------------------------|<--------|
+ * | | |
+ * | | |
+ * | | | BLOCK-INDEX PTR
+ * | | |
+ * |------------------------| |
+ * |------------------------|----------
+ * |------------------------|-----------------> BLOOM-FILTER
* version-info <--|----------|-------------|-------> relative offset to last block index.
- *
+ * <p/>
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
@@ -105,22 +105,22 @@
/**
* This class holds the position of a key in a block
- * and the size of the data associated with this key.
- */
+ * and the size of the data associated with this key.
+ */
protected static class BlockMetadata
{
protected static final BlockMetadata NULL = new BlockMetadata(-1L, -1L);
-
+
long position_;
long size_;
-
+
BlockMetadata(long position, long size)
{
position_ = position;
size_ = size;
}
}
-
+
/*
* This abstraction provides LRU symantics for the keys that are
* "touched". Currently it holds the offset of the key in a data
@@ -130,24 +130,24 @@
private static class TouchedKeyCache extends LinkedHashMap<String, Long>
{
private final int capacity_;
-
+
TouchedKeyCache(int capacity)
{
super(capacity + 1, 1.1f, true);
capacity_ = capacity;
}
-
+
protected boolean removeEldestEntry(Map.Entry<String, Long> entry)
{
- return ( size() > capacity_ );
+ return (size() > capacity_);
}
}
-
+
/**
* This is a simple container for the index Key and its corresponding position
* in the data file. Binary search is performed on a list of these objects
* to lookup keys within the SSTable data file.
- */
+ */
public static class KeyPositionInfo implements Comparable<KeyPositionInfo>
{
private final String decoratedKey;
@@ -183,30 +183,30 @@
public String toString()
{
- return decoratedKey + ":" + position_;
+ return decoratedKey + ":" + position_;
}
}
-
+
public static int indexInterval()
{
- return indexInterval_;
+ return indexInterval_;
}
-
+
/*
* Maintains a list of KeyPositionInfo objects per SSTable file loaded.
* We do this so that we don't read the index file into memory multiple
* times.
*/
static IndexMap indexMetadataMap_ = new IndexMap();
-
- /**
+
+ /**
* This method deletes both the specified data file
* and the associated index file
*
* @param dataFile - data file associated with the SSTable
- */
+ */
public static void delete(String dataFile)
- {
+ {
/* remove the cached index table from memory */
indexMetadataMap_.remove(dataFile);
/* Delete the checksum file associated with this data file */
@@ -214,11 +214,11 @@
{
ChecksumManager.onFileDelete(dataFile);
}
- catch ( IOException ex )
+ catch (IOException ex)
{
- logger_.info( LogUtil.throwableToString(ex) );
+ logger_.info(LogUtil.throwableToString(ex));
}
-
+
File file = new File(dataFile);
assert file.exists();
/* delete the data file */
@@ -228,39 +228,39 @@
}
}
- public static int getApproximateKeyCount( List<String> dataFiles)
+ public static int getApproximateKeyCount(List<String> dataFiles)
{
- int count = 0 ;
+ int count = 0;
- for(String dataFile : dataFiles )
- {
- List<KeyPositionInfo> index = indexMetadataMap_.get(dataFile);
- if (index != null )
- {
- int indexKeyCount = index.size();
- count = count + (indexKeyCount+1) * indexInterval_ ;
- logger_.debug("index size for bloom filter calc for file : " + dataFile + " : " + count);
- }
- }
+ for (String dataFile : dataFiles)
+ {
+ List<KeyPositionInfo> index = indexMetadataMap_.get(dataFile);
+ if (index != null)
+ {
+ int indexKeyCount = index.size();
+ count = count + (indexKeyCount + 1) * indexInterval_;
+ logger_.debug("index size for bloom filter calc for file : " + dataFile + " : " + count);
+ }
+ }
- return count;
+ return count;
}
/**
* Get all indexed keys in the SSTable.
- */
+ */
public static List<String> getIndexedKeys()
{
Set<String> indexFiles = indexMetadataMap_.keySet();
List<KeyPositionInfo> keyPositionInfos = new ArrayList<KeyPositionInfo>();
- for ( String indexFile : indexFiles )
+ for (String indexFile : indexFiles)
{
- keyPositionInfos.addAll( indexMetadataMap_.get(indexFile) );
+ keyPositionInfos.addAll(indexMetadataMap_.get(indexFile));
}
List<String> indexedKeys = new ArrayList<String>();
- for ( KeyPositionInfo keyPositionInfo : keyPositionInfos )
+ for (KeyPositionInfo keyPositionInfo : keyPositionInfos)
{
indexedKeys.add(keyPositionInfo.decoratedKey);
}
@@ -268,7 +268,7 @@
Collections.sort(indexedKeys);
return indexedKeys;
}
-
+
/*
* Intialize the index files and also cache the Bloom Filters
* associated with these files. Also caches the file handles
@@ -276,14 +276,14 @@
*/
public static void onStart(List<String> filenames) throws IOException
{
- for ( String filename : filenames )
+ for (String filename : filenames)
{
SSTable ssTable = null;
try
{
ssTable = new SSTable(filename, StorageService.getPartitioner());
}
- catch ( IOException ex )
+ catch (IOException ex)
{
logger_.info("Deleting corrupted file " + filename);
FileUtils.delete(filename);
@@ -291,7 +291,7 @@
}
finally
{
- if ( ssTable != null )
+ if (ssTable != null)
{
ssTable.close();
}
@@ -323,7 +323,7 @@
{
boolean bVal = false;
BloomFilter bf = bfs_.get(filename);
- if ( bf != null )
+ if (bf != null)
{
bVal = bf.isPresent(clientKey);
}
@@ -332,20 +332,20 @@
String dataFile_;
private IFileWriter dataWriter_;
- private String lastWrittenKey_;
- private long firstBlockPosition_ = 0L;
+ private String lastWrittenKey_;
+ private long firstBlockPosition_ = 0L;
private int indexKeysWritten_ = 0;
/* Holds the keys and their respective positions of the current block index */
- private SortedMap<String, BlockMetadata> blockIndex_;
+ private SortedMap<String, BlockMetadata> blockIndex_;
/* Holds all the block indicies for this SSTable */
private List<SortedMap<String, BlockMetadata>> blockIndexes_;
private IPartitioner partitioner_;
-
+
/**
* This ctor basically gets passed in the full path name
* of the data file associated with this SSTable. Use this
* ctor to read the data in this file.
- */
+ */
public SSTable(String dataFileName, IPartitioner partitioner) throws IOException
{
dataFile_ = dataFileName;
@@ -356,19 +356,19 @@
/**
* This ctor is used for writing data into the SSTable. Use this
* version for non DB writes to the SSTable.
- */
+ */
public SSTable(String directory, String filename, IPartitioner partitioner) throws IOException
{
dataFile_ = directory + System.getProperty("file.separator") + filename + "-Data.db";
partitioner_ = partitioner;
blockIndex_ = new TreeMap<String, BlockMetadata>(partitioner_.getReverseDecoratedKeyComparator());
blockIndexes_ = new ArrayList<SortedMap<String, BlockMetadata>>();
- dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4*1024*1024);
+ dataWriter_ = SequenceFile.bufferedWriter(dataFile_, 4 * 1024 * 1024);
SSTable.positionAfterFirstBlockIndex_ = dataWriter_.getCurrentPosition();
- }
+ }
private void loadBloomFilter(IFileReader indexReader, long size) throws IOException
- {
+ {
/* read the position of the bloom filter */
indexReader.seek(size - 8);
byte[] bytes = new byte[8];
@@ -380,11 +380,11 @@
DataOutputBuffer bufOut = new DataOutputBuffer();
DataInputBuffer bufIn = new DataInputBuffer();
/* read the bloom filter from disk */
- indexReader.next(bufOut);
+ indexReader.next(bufOut);
bufOut.close();
bufIn.reset(bufOut.getData(), bufOut.getLength());
String clientKey = bufIn.readUTF();
- if ( clientKey.equals(SequenceFile.marker_) )
+ if (clientKey.equals(SequenceFile.marker_))
{
/*
* We are now reading the serialized Bloom Filter. We read
@@ -394,20 +394,22 @@
* need not read the rest of the file.
*/
bufIn.readInt();
- if ( bfs_.get(dataFile_) == null )
+ if (bfs_.get(dataFile_) == null)
+ {
bfs_.put(dataFile_, BloomFilter.serializer().deserialize(bufIn));
+ }
}
}
-
+
private void loadIndexFile() throws IOException
- {
+ {
IFileReader indexReader = null;
/* Read all block indexes to maintain an index in memory */
try
- {
- indexReader = SequenceFile.bufferedReader(dataFile_, 4*1024*1024);
+ {
+ indexReader = SequenceFile.bufferedReader(dataFile_, 4 * 1024 * 1024);
long size = indexReader.getEOF();
-
+
/* load the bloom filter into memory */
loadBloomFilter(indexReader, size);
/* read the position of the last block index */
@@ -417,40 +419,40 @@
/* the beginning of the first block index */
long currentPosition = indexReader.getCurrentPosition();
indexReader.readDirect(bytes);
- long firstBlockIndexPosition = BasicUtilities.byteArrayToLong(bytes);
+ long firstBlockIndexPosition = BasicUtilities.byteArrayToLong(bytes);
List<KeyPositionInfo> keyPositionInfos = new ArrayList<KeyPositionInfo>();
indexMetadataMap_.put(dataFile_, keyPositionInfos);
DataOutputBuffer bufOut = new DataOutputBuffer();
- DataInputBuffer bufIn = new DataInputBuffer();
-
+ DataInputBuffer bufIn = new DataInputBuffer();
+
long nextPosition = currentPosition - firstBlockIndexPosition;
indexReader.seek(nextPosition);
/* read the block indexes from the end of the file till we hit the first one. */
- while ( nextPosition > 0 )
+ while (nextPosition > 0)
{
bufOut.reset();
/* position @ the current block index being processed */
currentPosition = indexReader.getCurrentPosition();
long bytesRead = indexReader.next(bufOut);
- if ( bytesRead != -1 )
+ if (bytesRead != -1)
{
bufIn.reset(bufOut.getData(), bufOut.getLength());
/* read the block key. */
String blockIndexKey = bufIn.readUTF();
- if ( !blockIndexKey.equals(SSTable.blockIndexKey_) )
+ if (!blockIndexKey.equals(SSTable.blockIndexKey_))
{
- logger_.debug(" Done reading the block indexes, Index has been created");
- break;
+ logger_.debug(" Done reading the block indexes, Index has been created");
+ break;
}
/* read the size of the block index */
- bufIn.readInt();
+ bufIn.readInt();
/* Number of keys in the block. */
int keys = bufIn.readInt();
String largestKeyInBlock;
- for ( int i = 0; i < keys; ++i )
+ for (int i = 0; i < keys; ++i)
{
String keyInBlock = bufIn.readUTF();
- if ( i == 0 )
+ if (i == 0)
{
largestKeyInBlock = keyInBlock;
/* relative offset in the block for the key*/
@@ -458,7 +460,7 @@
/* size of data associated with the key */
bufIn.readLong();
/* load the actual position of the block index into the index map */
- keyPositionInfos.add( new KeyPositionInfo(largestKeyInBlock, partitioner_, currentPosition) );
+ keyPositionInfos.add(new KeyPositionInfo(largestKeyInBlock, partitioner_, currentPosition));
}
else
{
@@ -476,30 +478,30 @@
bufIn.close();
bufOut.close();
Collections.sort(keyPositionInfos);
- }
+ }
finally
{
- if ( indexReader != null )
+ if (indexReader != null)
{
indexReader.close();
}
- }
+ }
}
private void init() throws IOException
- {
+ {
/*
* this is to prevent multiple threads from
* loading the same index files multiple times
* into memory.
*/
- synchronized( indexLoadLock_ )
+ synchronized (indexLoadLock_)
{
- if ( indexMetadataMap_.get(dataFile_) == null )
+ if (indexMetadataMap_.get(dataFile_) == null)
{
long start = System.currentTimeMillis();
loadIndexFile();
- logger_.debug("INDEX LOAD TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ logger_.debug("INDEX LOAD TIME: " + (System.currentTimeMillis() - start) + " ms.");
}
}
}
@@ -507,8 +509,10 @@
private String getFile(String name) throws IOException
{
File file = new File(name);
- if ( file.exists() )
+ if (file.exists())
+ {
return file.getAbsolutePath();
+ }
throw new IOException("File " + name + " was not found on disk.");
}
@@ -523,26 +527,28 @@
public void touch(final String clientKey, boolean fData) throws IOException
{
if (touchCache_.containsKey(dataFile_ + ":" + clientKey))
+ {
return;
-
- IFileReader dataReader = SequenceFile.reader(dataFile_);
+ }
+
+ IFileReader dataReader = SequenceFile.reader(dataFile_);
try
{
- /* Morph the key */
+ /* Morph the key */
String decoratedKey = partitioner_.decorateKey(clientKey);
Coordinate fileCoordinate = getCoordinates(decoratedKey, dataReader, partitioner_);
/* Get offset of key from block Index */
dataReader.seek(fileCoordinate.end_);
BlockMetadata blockMetadata = dataReader.getBlockMetadata(decoratedKey);
- if ( blockMetadata.position_ != -1L )
+ if (blockMetadata.position_ != -1L)
{
touchCache_.put(dataFile_ + ":" + clientKey, blockMetadata.position_);
- }
-
- if ( fData )
+ }
+
+ if (fData)
{
/* Read the data associated with this key and pull it into the Buffer Cache */
- if ( blockMetadata.position_ != -1L )
+ if (blockMetadata.position_ != -1L)
{
dataReader.seek(blockMetadata.position_);
DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -554,17 +560,21 @@
}
finally
{
- if ( dataReader != null )
+ if (dataReader != null)
+ {
dataReader.close();
+ }
}
}
private long beforeAppend(String decoratedKey) throws IOException
{
- if (decoratedKey == null )
+ if (decoratedKey == null)
+ {
throw new IOException("Keys must not be null.");
+ }
Comparator<String> c = partitioner_.getDecoratedKeyComparator();
- if ( lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0 )
+ if (lastWrittenKey_ != null && c.compare(lastWrittenKey_, decoratedKey) > 0)
{
logger_.info("Last written key : " + lastWrittenKey_);
logger_.info("Current key : " + decoratedKey);
@@ -579,45 +589,48 @@
++indexKeysWritten_;
lastWrittenKey_ = decoratedKey;
blockIndex_.put(decoratedKey, new BlockMetadata(position, size));
- if ( indexKeysWritten_ == indexInterval_ )
+ if (indexKeysWritten_ == indexInterval_)
{
- blockIndexes_.add(blockIndex_);
- blockIndex_ = new TreeMap<String, BlockMetadata>(partitioner_.getReverseDecoratedKeyComparator());
+ blockIndexes_.add(blockIndex_);
+ blockIndex_ = new TreeMap<String, BlockMetadata>(partitioner_.getReverseDecoratedKeyComparator());
indexKeysWritten_ = 0;
- }
+ }
}
/**
* Dumps all the block indicies for this SSTable
* at the end of the file.
+ *
* @throws IOException
*/
private void dumpBlockIndexes() throws IOException
{
firstBlockPosition_ = dataWriter_.getCurrentPosition();
- for( SortedMap<String, BlockMetadata> block : blockIndexes_ )
- {
- dumpBlockIndex( block );
- }
- }
-
- private void dumpBlockIndex( SortedMap<String, BlockMetadata> blockIndex) throws IOException
+ for (SortedMap<String, BlockMetadata> block : blockIndexes_)
+ {
+ dumpBlockIndex(block);
+ }
+ }
+
+ private void dumpBlockIndex(SortedMap<String, BlockMetadata> blockIndex) throws IOException
{
/* Block Index is empty so bail. */
- if ( blockIndex.size() == 0 )
+ if (blockIndex.size() == 0)
+ {
return;
-
+ }
+
DataOutputBuffer bufOut = new DataOutputBuffer();
/*
* Record the position where we start writing the block index. This is will be
* used as the position of the lastWrittenKey in the block in the index file
*/
long position = dataWriter_.getCurrentPosition();
- Set<String> keys = blockIndex.keySet();
+ Set<String> keys = blockIndex.keySet();
/* Number of keys in this block */
bufOut.writeInt(keys.size());
- for ( String decoratedKey : keys )
- {
+ for (String decoratedKey : keys)
+ {
bufOut.writeUTF(decoratedKey);
BlockMetadata blockMetadata = blockIndex.get(decoratedKey);
/* position of the key as a relative offset */
@@ -628,14 +641,14 @@
dataWriter_.append(SSTable.blockIndexKey_, bufOut);
/* Load this index into the in memory index map */
List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.get(dataFile_);
- if ( keyPositionInfos == null )
+ if (keyPositionInfos == null)
{
- keyPositionInfos = new ArrayList<KeyPositionInfo>();
- SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+ keyPositionInfos = new ArrayList<KeyPositionInfo>();
+ SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
}
-
+
keyPositionInfos.add(new KeyPositionInfo(blockIndex.firstKey(), partitioner_, position));
- blockIndex.clear();
+ blockIndex.clear();
}
public void append(String decoratedKey, DataOutputBuffer buffer) throws IOException
@@ -649,7 +662,7 @@
{
long currentPosition = beforeAppend(decoratedKey);
dataWriter_.append(decoratedKey, value);
- afterAppend(decoratedKey, currentPosition, value.length );
+ afterAppend(decoratedKey, currentPosition, value.length);
}
/*
@@ -658,24 +671,24 @@
*/
public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader, IPartitioner partitioner) throws IOException
{
- List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
+ List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
assert indexInfo != null && indexInfo.size() > 0;
long start = 0L;
- long end;
+ long end;
int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey, partitioner));
- if ( index < 0 )
+ if (index < 0)
{
/*
* We are here which means that the requested
* key is not an index.
*/
- index = (++index)*(-1);
+ index = (++index) * (-1);
/*
* This means key is not present at all. Hence
* a scan is in order.
*/
start = (index == 0) ? 0 : indexInfo.get(index - 1).position();
- if ( index < indexInfo.size())
+ if (index < indexInfo.size())
{
end = indexInfo.get(index).position();
}
@@ -698,7 +711,7 @@
}
return new Coordinate(start, end);
}
-
+
public DataInputBuffer next(final String clientKey, String cfName, List<String> columnNames) throws IOException
{
return next(clientKey, cfName, columnNames, null);
@@ -723,9 +736,9 @@
DataInputBuffer bufIn = new DataInputBuffer();
long bytesRead = dataReader.next(decoratedKey, bufOut, cfName, columnNames, timeRange, fileCoordinate);
- if ( bytesRead != -1L )
+ if (bytesRead != -1L)
{
- if ( bufOut.getLength() > 0 )
+ if (bufOut.getLength() > 0)
{
bufIn.reset(bufOut.getData(), bufOut.getLength());
/* read the key even though we do not use it */
@@ -754,16 +767,16 @@
public void close() throws IOException
{
- close( new byte[0], 0 );
+ close(new byte[0], 0);
}
public void close(BloomFilter bf) throws IOException
{
/* Any remnants in the blockIndex should be added to the dump */
- blockIndexes_.add(blockIndex_);
- dumpBlockIndexes();
-
- /* reset the buffer and serialize the Bloom Filter. */
+ blockIndexes_.add(blockIndex_);
+ dumpBlockIndexes();
+
+ /* reset the buffer and serialize the Bloom Filter. */
DataOutputBuffer bufOut = new DataOutputBuffer();
BloomFilter.serializer().serialize(bf, bufOut);
close(bufOut.getData(), bufOut.getLength());
@@ -778,15 +791,15 @@
*/
public void closeRename(BloomFilter bf) throws IOException
{
- close(bf);
+ close(bf);
String tmpDataFile = dataFile_;
- String dataFileName = dataFile_.replace("-" + temporaryFile_,"");
- File dataFile = new File(dataFile_);
- dataFile.renameTo(new File(dataFileName));
+ String dataFileName = dataFile_.replace("-" + temporaryFile_, "");
+ File dataFile = new File(dataFile_);
+ dataFile.renameTo(new File(dataFileName));
dataFile_ = dataFileName;
/* Now repair the in memory index associated with the old name */
- List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.remove(tmpDataFile);
- SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
+ List<KeyPositionInfo> keyPositionInfos = SSTable.indexMetadataMap_.remove(tmpDataFile);
+ SSTable.indexMetadataMap_.put(dataFile_, keyPositionInfos);
}
private void close(byte[] footer, int size) throws IOException
@@ -798,19 +811,19 @@
* block index and the last one is the position of
* the Bloom Filter.
*/
- if ( dataWriter_ != null )
- {
+ if (dataWriter_ != null)
+ {
long bloomFilterPosition = dataWriter_.getCurrentPosition();
dataWriter_.close(footer, size);
- /* write the version field into the SSTable */
+ /* write the version field into the SSTable */
dataWriter_.writeDirect(BasicUtilities.longToByteArray(version_));
/* write the relative position of the first block index from current position */
long blockPosition = dataWriter_.getCurrentPosition() - firstBlockPosition_;
dataWriter_.writeDirect(BasicUtilities.longToByteArray(blockPosition));
-
+
/* write the position of the bloom filter */
long bloomFilterRelativePosition = dataWriter_.getCurrentPosition() - bloomFilterPosition;
- dataWriter_.writeDirect(BasicUtilities.longToByteArray(bloomFilterRelativePosition));
+ dataWriter_.writeDirect(BasicUtilities.longToByteArray(bloomFilterRelativePosition));
dataWriter_.close();
}
}