You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/08 20:59:29 UTC
svn commit: r812641 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Tue Sep 8 18:59:28 2009
New Revision: 812641
URL: http://svn.apache.org/viewvc?rev=812641&view=rev
Log:
make compaction code use SSTableReader objects. r/m SSTableReader.get static method
patch by jbellis; reviewed by Eric Evans for CASSANDRA-424
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileNameComparator.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=812641&r1=812640&r2=812641&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Sep 8 18:59:28 2009
@@ -77,7 +77,7 @@
private AtomicReference<BinaryMemtable> binaryMemtable_;
/* SSTables on disk for this column family */
- private SortedMap<String, SSTableReader> ssTables_ = new TreeMap<String, SSTableReader>(new FileNameComparator(FileNameComparator.Descending));
+ private Map<String, SSTableReader> ssTables_ = new HashMap<String, SSTableReader>();
/* Modification lock used for protecting reads from compactions. */
private ReentrantReadWriteLock sstableLock_ = new ReentrantReadWriteLock(true);
@@ -309,7 +309,7 @@
return filename.split("-")[0];
}
- protected static int getGenerationFromFileName(String filename)
+ public static int getGenerationFromFileName(String filename)
{
/*
* File name is of the form <table>-<column family>-<index>-Data.db.
@@ -608,15 +608,15 @@
}
}
- private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges) throws IOException
+ private PriorityQueue<FileStruct> initializePriorityQueue(List<SSTableReader> sstables, List<Range> ranges) throws IOException
{
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
- if (files.size() > 1 || (ranges != null && files.size() > 0))
+ if (sstables.size() > 1 || (ranges != null && sstables.size() > 0))
{
FileStruct fs = null;
- for (String file : files)
+ for (SSTableReader sstable : sstables)
{
- fs = SSTableReader.get(file).getFileStruct();
+ fs = sstable.getFileStruct();
fs.advance(true);
if (fs.isExhausted())
{
@@ -631,19 +631,18 @@
/*
* Group files of similar size into buckets.
*/
- static Set<List<String>> getCompactionBuckets(Iterable<String> files, long min)
+ static Set<List<SSTableReader>> getCompactionBuckets(Iterable<SSTableReader> files, long min)
{
- Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
- for (String fname : files)
+ Map<List<SSTableReader>, Long> buckets = new HashMap<List<SSTableReader>, Long>();
+ for (SSTableReader sstable : files)
{
- File f = new File(fname);
- long size = f.length();
+ long size = sstable.length();
boolean bFound = false;
// look for a bucket containing similar-sized files:
// group in the same bucket if it's w/in 50% of the average for this bucket,
// or this file and the bucket are all considered "small" (less than `min`)
- for (List<String> bucket : buckets.keySet())
+ for (List<SSTableReader> bucket : buckets.keySet())
{
long averageSize = buckets.get(bucket);
if ((size > averageSize / 2 && size < 3 * averageSize / 2)
@@ -652,7 +651,7 @@
// remove and re-add because adding changes the hash
buckets.remove(bucket);
averageSize = (averageSize + size) / 2;
- bucket.add(fname);
+ bucket.add(sstable);
buckets.put(bucket, averageSize);
bFound = true;
break;
@@ -661,8 +660,8 @@
// no similar bucket found; put it in a new one
if (!bFound)
{
- ArrayList<String> bucket = new ArrayList<String>();
- bucket.add(fname);
+ ArrayList<SSTableReader> bucket = new ArrayList<SSTableReader>();
+ bucket.add(sstable);
buckets.put(bucket, size);
}
}
@@ -676,16 +675,16 @@
int doCompaction(int minThreshold, int maxThreshold) throws IOException
{
int filesCompacted = 0;
- for (List<String> files : getCompactionBuckets(ssTables_.keySet(), 50L * 1024L * 1024L))
+ for (List<SSTableReader> sstables : getCompactionBuckets(ssTables_.values(), 50L * 1024L * 1024L))
{
- if (files.size() < minThreshold)
+ if (sstables.size() < minThreshold)
{
continue;
}
// if we have too many to compact all at once, compact older ones first -- this avoids
// re-compacting files we just created.
- Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
- filesCompacted += doFileCompaction(files.subList(0, Math.min(files.size(), maxThreshold)));
+ Collections.sort(sstables);
+ filesCompacted += doFileCompaction(sstables.subList(0, Math.min(sstables.size(), maxThreshold)));
}
return filesCompacted;
}
@@ -703,38 +702,35 @@
*/
void doMajorCompactionInternal(long skip) throws IOException
{
- List<String> filesInternal = new ArrayList<String>(ssTables_.keySet());
- List<String> files;
+ List<SSTableReader> sstables;
if (skip > 0L)
{
- files = new ArrayList<String>();
- for (String file : filesInternal)
+ sstables = new ArrayList<SSTableReader>();
+ for (SSTableReader sstable : ssTables_.values())
{
- File f = new File(file);
- if (f.length() < skip * 1024L * 1024L * 1024L)
+ if (sstable.length() < skip * 1024L * 1024L * 1024L)
{
- files.add(file);
+ sstables.add(sstable);
}
}
}
else
{
- files = filesInternal;
+ sstables = new ArrayList<SSTableReader>(ssTables_.values());
}
- doFileCompaction(files);
+ doFileCompaction(sstables);
}
/*
* Add up all the files sizes this is the worst case file
* size for compaction of all the list of files given.
*/
- long getExpectedCompactedFileSize(List<String> files)
+ long getExpectedCompactedFileSize(List<SSTableReader> sstables)
{
long expectedFileSize = 0;
- for (String file : files)
+ for (SSTableReader sstable : sstables)
{
- File f = new File(file);
- long size = f.length();
+ long size = sstable.length();
expectedFileSize = expectedFileSize + size;
}
return expectedFileSize;
@@ -743,17 +739,16 @@
/*
* Find the maximum size file in the list .
*/
- String getMaxSizeFile(List<String> files)
+ SSTableReader getMaxSizeFile(List<SSTableReader> sstables)
{
long maxSize = 0L;
- String maxFile = null;
- for (String file : files)
+ SSTableReader maxFile = null;
+ for (SSTableReader sstable : sstables)
{
- File f = new File(file);
- if (f.length() > maxSize)
+ if (sstable.length() > maxSize)
{
- maxSize = f.length();
- maxFile = file;
+ maxSize = sstable.length();
+ maxFile = sstable;
}
}
return maxFile;
@@ -761,8 +756,7 @@
boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
{
- List<String> files = new ArrayList<String>(ssTables_.keySet());
- return doFileAntiCompaction(files, ranges, target, fileList);
+ return doFileAntiCompaction(new ArrayList<SSTableReader>(ssTables_.values()), ranges, target, fileList);
}
void forceCleanup()
@@ -778,48 +772,33 @@
*/
void doCleanupCompaction() throws IOException
{
- List<String> files = new ArrayList<String>(ssTables_.keySet());
- for (String file : files)
+ List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
+ for (SSTableReader sstable : sstables)
{
- doCleanup(file);
+ doCleanup(sstable);
}
}
/**
* cleans up one particular file by removing keys that this node is not responsible for.
- *
- * @param file
* @throws IOException
*/
/* TODO: Take care of the comments later. */
- void doCleanup(String file) throws IOException
+ void doCleanup(SSTableReader sstable) throws IOException
{
- if (file == null)
- {
- return;
- }
+ assert sstable != null;
List<Range> myRanges;
- List<String> files = new ArrayList<String>();
- files.add(file);
List<String> newFiles = new ArrayList<String>();
Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
- doFileAntiCompaction(files, myRanges, null, newFiles);
+ doFileAntiCompaction(Arrays.asList(sstable), myRanges, null, newFiles);
if (logger_.isDebugEnabled())
- logger_.debug("Original file : " + file + " of size " + new File(file).length());
+ logger_.debug("Original file : " + sstable + " of size " + sstable.length());
sstableLock_.writeLock().lock();
try
{
- ssTables_.remove(file);
- for (String newfile : newFiles)
- {
- if (logger_.isDebugEnabled())
- logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
- assert newfile != null;
- // TODO convert this to SSTableWriter.renameAndOpen
- ssTables_.put(newfile, SSTableReader.open(newfile));
- }
- SSTableReader.get(file).delete();
+ ssTables_.remove(sstable.getFilename());
+ sstable.delete();
}
finally
{
@@ -831,14 +810,14 @@
* This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range
* If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out.
*
- * @param files
+ * @param sstables
* @param ranges
* @param target
* @param fileList
* @return
* @throws IOException
*/
- boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+ boolean doFileAntiCompaction(List<SSTableReader> sstables, List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
{
boolean result = false;
long startTime = System.currentTimeMillis();
@@ -849,7 +828,7 @@
String rangeFileLocation;
String mergedFileName;
// Calculate the expected compacted filesize
- long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+ long expectedRangeFileSize = getExpectedCompactedFileSize(sstables);
/* in the worst case a node will be giving out half of its data so we take a chance */
expectedRangeFileSize = expectedRangeFileSize / 2;
rangeFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, expectedRangeFileSize);
@@ -860,7 +839,7 @@
+ expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
return result;
}
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges);
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(sstables, ranges);
if (pq.isEmpty())
{
return result;
@@ -871,7 +850,7 @@
String lastkey = null;
List<FileStruct> lfs = new ArrayList<FileStruct>();
DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+ int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(sstables);
expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
if (logger_.isDebugEnabled())
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -1008,17 +987,17 @@
* to get the latest data.
*
*/
- private int doFileCompaction(List<String> files) throws IOException
+ private int doFileCompaction(List<SSTableReader> sstables) throws IOException
{
- logger_.info("Compacting [" + StringUtils.join(files, ",") + "]");
- String compactionFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, getExpectedCompactedFileSize(files));
+ logger_.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
+ String compactionFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, getExpectedCompactedFileSize(sstables));
// If the compaction file path is null that means we have no space left for this compaction.
// try again w/o the largest one.
if (compactionFileLocation == null)
{
- String maxFile = getMaxSizeFile(files);
- files.remove(maxFile);
- return doFileCompaction(files);
+ SSTableReader maxFile = getMaxSizeFile(sstables);
+ sstables.remove(maxFile);
+ return doFileCompaction(sstables);
}
String newfile = null;
@@ -1027,7 +1006,7 @@
long totalBytesWritten = 0;
long totalkeysRead = 0;
long totalkeysWritten = 0;
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null);
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(sstables, null);
if (pq.isEmpty())
{
@@ -1042,7 +1021,7 @@
String lastkey = null;
List<FileStruct> lfs = new ArrayList<FileStruct>();
DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+ int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(sstables);
expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
if (logger_.isDebugEnabled())
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -1131,18 +1110,15 @@
sstableLock_.writeLock().lock();
try
{
- for (String file : files)
- {
- ssTables_.remove(file);
- }
if (newfile != null)
{
ssTables_.put(newfile, ssTable);
totalBytesWritten += (new File(newfile)).length();
}
- for (String file : files)
+ for (SSTableReader sstable : sstables)
{
- SSTableReader.get(file).delete();
+ ssTables_.remove(sstable.getFilename());
+ sstable.delete();
}
}
finally
@@ -1153,7 +1129,7 @@
String format = "Compacted to %s. %d/%d bytes for %d/%d keys read/written. Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
logger_.info(String.format(format, newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
- return files.size();
+ return sstables.size();
}
public static List<Memtable> getUnflushedMemtables(String cfName)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=812641&r1=812640&r2=812641&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Tue Sep 8 18:59:28 2009
@@ -31,14 +31,15 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.marshal.AbstractType;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
- * Do not use open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
*/
-public class SSTableReader extends SSTable
+public class SSTableReader extends SSTable implements Comparable<SSTableReader>
{
private static final Logger logger = Logger.getLogger(SSTableReader.class);
@@ -49,19 +50,16 @@
return INDEX_INTERVAL;
}
- // todo can we refactor to take list of sstables?
- public static int getApproximateKeyCount(List<String> dataFiles)
+ public static int getApproximateKeyCount(List<SSTableReader> sstables)
{
int count = 0;
- for (String dataFileName : dataFiles)
+ for (SSTableReader sstable : sstables)
{
- SSTableReader sstable = openedFiles.get(dataFileName);
- assert sstable != null;
int indexKeyCount = sstable.getIndexPositions().size();
count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
if (logger.isDebugEnabled())
- logger.debug("index size for bloom filter calc for file : " + dataFileName + " : " + count);
+ logger.debug("index size for bloom filter calc for file : " + sstable.getFilename() + " : " + count);
}
return count;
@@ -106,14 +104,6 @@
return sstable;
}
- @Deprecated // move away from get() towards using the SSTR objects CFS knows about
- public static SSTableReader get(String dataFileName)
- {
- SSTableReader sstable = openedFiles.get(dataFileName);
- assert sstable != null : "No sstable opened for " + dataFileName + ": " + openedFiles;
- return sstable;
- }
-
SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter)
{
super(filename, partitioner);
@@ -264,6 +254,16 @@
}
}
+ public long length()
+ {
+ return new File(path).length();
+ }
+
+ public int compareTo(SSTableReader o)
+ {
+ return ColumnFamilyStore.getGenerationFromFileName(path) - ColumnFamilyStore.getGenerationFromFileName(o.path);
+ }
+
public void delete() throws IOException
{
FileUtils.deleteWithConfirm(new File(path));
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=812641&r1=812640&r2=812641&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Tue Sep 8 18:59:28 2009
@@ -48,59 +48,6 @@
}
@Test
- public void testGetCompactionBuckets() throws IOException
- {
- // create files 20 40 60 ... 180
- List<String> small = new ArrayList<String>();
- List<String> med = new ArrayList<String>();
- List<String> all = new ArrayList<String>();
-
- String fname;
- fname = createFile(20);
- small.add(fname);
- all.add(fname);
- fname = createFile(40);
- small.add(fname);
- all.add(fname);
-
- for (int i = 60; i <= 140; i += 20)
- {
- fname = createFile(i);
- med.add(fname);
- all.add(fname);
- }
-
- Set<List<String>> buckets = ColumnFamilyStore.getCompactionBuckets(all, 50);
- assert buckets.size() == 2 : bucketString(buckets);
- Iterator<List<String>> iter = buckets.iterator();
- List<String> bucket1 = iter.next();
- List<String> bucket2 = iter.next();
- assert bucket1.size() + bucket2.size() == all.size() : bucketString(buckets) + " does not match [" + StringUtils.join(all, ", ") + "]";
- assert buckets.contains(small) : bucketString(buckets) + " does not contain {" + StringUtils.join(small, ", ") + "}";
- assert buckets.contains(med) : bucketString(buckets) + " does not contain {" + StringUtils.join(med, ", ") + "}";
- }
-
- private static String bucketString(Set<List<String>> buckets)
- {
- ArrayList<String> pieces = new ArrayList<String>();
- for (List<String> bucket : buckets)
- {
- pieces.add("[" + StringUtils.join(bucket, ", ") + "]");
- }
- return "{" + StringUtils.join(pieces, ", ") + "}";
- }
-
- private String createFile(int nBytes) throws IOException
- {
- File f = File.createTempFile("bucket_test", "");
- FileOutputStream fos = new FileOutputStream(f);
- byte[] bytes = new byte[nBytes];
- fos.write(bytes);
- fos.close();
- return f.getAbsolutePath();
- }
-
- @Test
public void testGetColumnWithWrongBF() throws IOException, ExecutionException, InterruptedException
{
Table table = Table.open("Keyspace1");