You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/08 20:59:29 UTC

svn commit: r812641 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Tue Sep  8 18:59:28 2009
New Revision: 812641

URL: http://svn.apache.org/viewvc?rev=812641&view=rev
Log:
make compaction code use SSTableReader objects.  r/m SSTableReader.get static method
patch by jbellis; reviewed by Eric Evans for CASSANDRA-424

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileNameComparator.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=812641&r1=812640&r2=812641&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Sep  8 18:59:28 2009
@@ -77,7 +77,7 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    private SortedMap<String, SSTableReader> ssTables_ = new TreeMap<String, SSTableReader>(new FileNameComparator(FileNameComparator.Descending));
+    private Map<String, SSTableReader> ssTables_ = new HashMap<String, SSTableReader>();
 
     /* Modification lock used for protecting reads from compactions. */
     private ReentrantReadWriteLock sstableLock_ = new ReentrantReadWriteLock(true);
@@ -309,7 +309,7 @@
         return filename.split("-")[0];
     }
 
-    protected static int getGenerationFromFileName(String filename)
+    public static int getGenerationFromFileName(String filename)
     {
         /*
          * File name is of the form <table>-<column family>-<index>-Data.db.
@@ -608,15 +608,15 @@
         }
     }
 
-    private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges) throws IOException
+    private PriorityQueue<FileStruct> initializePriorityQueue(List<SSTableReader> sstables, List<Range> ranges) throws IOException
     {
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
-        if (files.size() > 1 || (ranges != null && files.size() > 0))
+        if (sstables.size() > 1 || (ranges != null && sstables.size() > 0))
         {
             FileStruct fs = null;
-            for (String file : files)
+            for (SSTableReader sstable : sstables)
             {
-                fs = SSTableReader.get(file).getFileStruct();
+                fs = sstable.getFileStruct();
                 fs.advance(true);
                 if (fs.isExhausted())
                 {
@@ -631,19 +631,18 @@
     /*
      * Group files of similar size into buckets.
      */
-    static Set<List<String>> getCompactionBuckets(Iterable<String> files, long min)
+    static Set<List<SSTableReader>> getCompactionBuckets(Iterable<SSTableReader> files, long min)
     {
-        Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
-        for (String fname : files)
+        Map<List<SSTableReader>, Long> buckets = new HashMap<List<SSTableReader>, Long>();
+        for (SSTableReader sstable : files)
         {
-            File f = new File(fname);
-            long size = f.length();
+            long size = sstable.length();
 
             boolean bFound = false;
             // look for a bucket containing similar-sized files:
             // group in the same bucket if it's w/in 50% of the average for this bucket,
             // or this file and the bucket are all considered "small" (less than `min`)
-            for (List<String> bucket : buckets.keySet())
+            for (List<SSTableReader> bucket : buckets.keySet())
             {
                 long averageSize = buckets.get(bucket);
                 if ((size > averageSize / 2 && size < 3 * averageSize / 2)
@@ -652,7 +651,7 @@
                     // remove and re-add because adding changes the hash
                     buckets.remove(bucket);
                     averageSize = (averageSize + size) / 2;
-                    bucket.add(fname);
+                    bucket.add(sstable);
                     buckets.put(bucket, averageSize);
                     bFound = true;
                     break;
@@ -661,8 +660,8 @@
             // no similar bucket found; put it in a new one
             if (!bFound)
             {
-                ArrayList<String> bucket = new ArrayList<String>();
-                bucket.add(fname);
+                ArrayList<SSTableReader> bucket = new ArrayList<SSTableReader>();
+                bucket.add(sstable);
                 buckets.put(bucket, size);
             }
         }
@@ -676,16 +675,16 @@
     int doCompaction(int minThreshold, int maxThreshold) throws IOException
     {
         int filesCompacted = 0;
-        for (List<String> files : getCompactionBuckets(ssTables_.keySet(), 50L * 1024L * 1024L))
+        for (List<SSTableReader> sstables : getCompactionBuckets(ssTables_.values(), 50L * 1024L * 1024L))
         {
-            if (files.size() < minThreshold)
+            if (sstables.size() < minThreshold)
             {
                 continue;
             }
             // if we have too many to compact all at once, compact older ones first -- this avoids
             // re-compacting files we just created.
-            Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
-            filesCompacted += doFileCompaction(files.subList(0, Math.min(files.size(), maxThreshold)));
+            Collections.sort(sstables);
+            filesCompacted += doFileCompaction(sstables.subList(0, Math.min(sstables.size(), maxThreshold)));
         }
         return filesCompacted;
     }
@@ -703,38 +702,35 @@
      */
     void doMajorCompactionInternal(long skip) throws IOException
     {
-        List<String> filesInternal = new ArrayList<String>(ssTables_.keySet());
-        List<String> files;
+        List<SSTableReader> sstables;
         if (skip > 0L)
         {
-            files = new ArrayList<String>();
-            for (String file : filesInternal)
+            sstables = new ArrayList<SSTableReader>();
+            for (SSTableReader sstable : ssTables_.values())
             {
-                File f = new File(file);
-                if (f.length() < skip * 1024L * 1024L * 1024L)
+                if (sstable.length() < skip * 1024L * 1024L * 1024L)
                 {
-                    files.add(file);
+                    sstables.add(sstable);
                 }
             }
         }
         else
         {
-            files = filesInternal;
+            sstables = new ArrayList<SSTableReader>(ssTables_.values());
         }
-        doFileCompaction(files);
+        doFileCompaction(sstables);
     }
 
     /*
      * Add up all the files sizes this is the worst case file
      * size for compaction of all the list of files given.
      */
-    long getExpectedCompactedFileSize(List<String> files)
+    long getExpectedCompactedFileSize(List<SSTableReader> sstables)
     {
         long expectedFileSize = 0;
-        for (String file : files)
+        for (SSTableReader sstable : sstables)
         {
-            File f = new File(file);
-            long size = f.length();
+            long size = sstable.length();
             expectedFileSize = expectedFileSize + size;
         }
         return expectedFileSize;
@@ -743,17 +739,16 @@
     /*
      *  Find the maximum size file in the list .
      */
-    String getMaxSizeFile(List<String> files)
+    SSTableReader getMaxSizeFile(List<SSTableReader> sstables)
     {
         long maxSize = 0L;
-        String maxFile = null;
-        for (String file : files)
+        SSTableReader maxFile = null;
+        for (SSTableReader sstable : sstables)
         {
-            File f = new File(file);
-            if (f.length() > maxSize)
+            if (sstable.length() > maxSize)
             {
-                maxSize = f.length();
-                maxFile = file;
+                maxSize = sstable.length();
+                maxFile = sstable;
             }
         }
         return maxFile;
@@ -761,8 +756,7 @@
 
     boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
-        List<String> files = new ArrayList<String>(ssTables_.keySet());
-        return doFileAntiCompaction(files, ranges, target, fileList);
+        return doFileAntiCompaction(new ArrayList<SSTableReader>(ssTables_.values()), ranges, target, fileList);
     }
 
     void forceCleanup()
@@ -778,48 +772,33 @@
      */
     void doCleanupCompaction() throws IOException
     {
-        List<String> files = new ArrayList<String>(ssTables_.keySet());
-        for (String file : files)
+        List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
+        for (SSTableReader sstable : sstables)
         {
-            doCleanup(file);
+            doCleanup(sstable);
         }
     }
 
     /**
      * cleans up one particular file by removing keys that this node is not responsible for.
-     *
-     * @param file
      * @throws IOException
      */
     /* TODO: Take care of the comments later. */
-    void doCleanup(String file) throws IOException
+    void doCleanup(SSTableReader sstable) throws IOException
     {
-        if (file == null)
-        {
-            return;
-        }
+        assert sstable != null;
         List<Range> myRanges;
-        List<String> files = new ArrayList<String>();
-        files.add(file);
         List<String> newFiles = new ArrayList<String>();
         Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
         myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
-        doFileAntiCompaction(files, myRanges, null, newFiles);
+        doFileAntiCompaction(Arrays.asList(sstable), myRanges, null, newFiles);
         if (logger_.isDebugEnabled())
-          logger_.debug("Original file : " + file + " of size " + new File(file).length());
+          logger_.debug("Original file : " + sstable + " of size " + sstable.length());
         sstableLock_.writeLock().lock();
         try
         {
-            ssTables_.remove(file);
-            for (String newfile : newFiles)
-            {
-                if (logger_.isDebugEnabled())
-                  logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
-                assert newfile != null;
-                // TODO convert this to SSTableWriter.renameAndOpen
-                ssTables_.put(newfile, SSTableReader.open(newfile));
-            }
-            SSTableReader.get(file).delete();
+            ssTables_.remove(sstable.getFilename());
+            sstable.delete();
         }
         finally
         {
@@ -831,14 +810,14 @@
      * This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range
      * If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out.
      *
-     * @param files
+     * @param sstables
      * @param ranges
      * @param target
      * @param fileList
      * @return
      * @throws IOException
      */
-    boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+    boolean doFileAntiCompaction(List<SSTableReader> sstables, List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
         boolean result = false;
         long startTime = System.currentTimeMillis();
@@ -849,7 +828,7 @@
         String rangeFileLocation;
         String mergedFileName;
         // Calculate the expected compacted filesize
-        long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+        long expectedRangeFileSize = getExpectedCompactedFileSize(sstables);
         /* in the worst case a node will be giving out half of its data so we take a chance */
         expectedRangeFileSize = expectedRangeFileSize / 2;
         rangeFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, expectedRangeFileSize);
@@ -860,7 +839,7 @@
                           + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
             return result;
         }
-        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges);
+        PriorityQueue<FileStruct> pq = initializePriorityQueue(sstables, ranges);
         if (pq.isEmpty())
         {
             return result;
@@ -871,7 +850,7 @@
         String lastkey = null;
         List<FileStruct> lfs = new ArrayList<FileStruct>();
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(sstables);
         expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -1008,17 +987,17 @@
     * to get the latest data.
     *
     */
-    private int doFileCompaction(List<String> files) throws IOException
+    private int doFileCompaction(List<SSTableReader> sstables) throws IOException
     {
-        logger_.info("Compacting [" + StringUtils.join(files, ",") + "]");
-        String compactionFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, getExpectedCompactedFileSize(files));
+        logger_.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
+        String compactionFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, getExpectedCompactedFileSize(sstables));
         // If the compaction file path is null that means we have no space left for this compaction.
         // try again w/o the largest one.
         if (compactionFileLocation == null)
         {
-            String maxFile = getMaxSizeFile(files);
-            files.remove(maxFile);
-            return doFileCompaction(files);
+            SSTableReader maxFile = getMaxSizeFile(sstables);
+            sstables.remove(maxFile);
+            return doFileCompaction(sstables);
         }
 
         String newfile = null;
@@ -1027,7 +1006,7 @@
         long totalBytesWritten = 0;
         long totalkeysRead = 0;
         long totalkeysWritten = 0;
-        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null);
+        PriorityQueue<FileStruct> pq = initializePriorityQueue(sstables, null);
 
         if (pq.isEmpty())
         {
@@ -1042,7 +1021,7 @@
         String lastkey = null;
         List<FileStruct> lfs = new ArrayList<FileStruct>();
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(files);
+        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(sstables);
         expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
@@ -1131,18 +1110,15 @@
         sstableLock_.writeLock().lock();
         try
         {
-            for (String file : files)
-            {
-                ssTables_.remove(file);
-            }
             if (newfile != null)
             {
                 ssTables_.put(newfile, ssTable);
                 totalBytesWritten += (new File(newfile)).length();
             }
-            for (String file : files)
+            for (SSTableReader sstable : sstables)
             {
-                SSTableReader.get(file).delete();
+                ssTables_.remove(sstable.getFilename());
+                sstable.delete();
             }
         }
         finally
@@ -1153,7 +1129,7 @@
         String format = "Compacted to %s.  %d/%d bytes for %d/%d keys read/written.  Time: %dms.";
         long dTime = System.currentTimeMillis() - startTime;
         logger_.info(String.format(format, newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
-        return files.size();
+        return sstables.size();
     }
 
     public static List<Memtable> getUnflushedMemtables(String cfName)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=812641&r1=812640&r2=812641&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Tue Sep  8 18:59:28 2009
@@ -31,14 +31,15 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
- * Do not use open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
  */
-public class SSTableReader extends SSTable
+public class SSTableReader extends SSTable implements Comparable<SSTableReader>
 {
     private static final Logger logger = Logger.getLogger(SSTableReader.class);
 
@@ -49,19 +50,16 @@
         return INDEX_INTERVAL;
     }
 
-    // todo can we refactor to take list of sstables?
-    public static int getApproximateKeyCount(List<String> dataFiles)
+    public static int getApproximateKeyCount(List<SSTableReader> sstables)
     {
         int count = 0;
 
-        for (String dataFileName : dataFiles)
+        for (SSTableReader sstable : sstables)
         {
-            SSTableReader sstable = openedFiles.get(dataFileName);
-            assert sstable != null;
             int indexKeyCount = sstable.getIndexPositions().size();
             count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
             if (logger.isDebugEnabled())
-                logger.debug("index size for bloom filter calc for file  : " + dataFileName + "   : " + count);
+                logger.debug("index size for bloom filter calc for file  : " + sstable.getFilename() + "   : " + count);
         }
 
         return count;
@@ -106,14 +104,6 @@
         return sstable;
     }
 
-    @Deprecated // move away from get() towards using the SSTR objects CFS knows about
-    public static SSTableReader get(String dataFileName)
-    {
-        SSTableReader sstable = openedFiles.get(dataFileName);
-        assert sstable != null : "No sstable opened for " + dataFileName + ": " + openedFiles;
-        return sstable;
-    }
-
     SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter)
     {
         super(filename, partitioner);
@@ -264,6 +254,16 @@
         }
     }
 
+    public long length()
+    {
+        return new File(path).length();
+    }
+
+    public int compareTo(SSTableReader o)
+    {
+        return ColumnFamilyStore.getGenerationFromFileName(path) - ColumnFamilyStore.getGenerationFromFileName(o.path);
+    }
+
     public void delete() throws IOException
     {
         FileUtils.deleteWithConfirm(new File(path));

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=812641&r1=812640&r2=812641&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Tue Sep  8 18:59:28 2009
@@ -48,59 +48,6 @@
     }
 
     @Test
-    public void testGetCompactionBuckets() throws IOException
-    {
-        // create files 20 40 60 ... 180
-        List<String> small = new ArrayList<String>();
-        List<String> med = new ArrayList<String>();
-        List<String> all = new ArrayList<String>();
-
-        String fname;
-        fname = createFile(20);
-        small.add(fname);
-        all.add(fname);
-        fname = createFile(40);
-        small.add(fname);
-        all.add(fname);
-
-        for (int i = 60; i <= 140; i += 20)
-        {
-            fname = createFile(i);
-            med.add(fname);
-            all.add(fname);
-        }
-
-        Set<List<String>> buckets = ColumnFamilyStore.getCompactionBuckets(all, 50);
-        assert buckets.size() == 2 : bucketString(buckets);
-        Iterator<List<String>> iter = buckets.iterator();
-        List<String> bucket1 = iter.next();
-        List<String> bucket2 = iter.next();
-        assert bucket1.size() + bucket2.size() == all.size() : bucketString(buckets) + " does not match [" + StringUtils.join(all, ", ") + "]";
-        assert buckets.contains(small) : bucketString(buckets) + " does not contain {" + StringUtils.join(small, ", ") + "}";
-        assert buckets.contains(med) : bucketString(buckets) + " does not contain {" + StringUtils.join(med, ", ") + "}";
-    }
-
-    private static String bucketString(Set<List<String>> buckets)
-    {
-        ArrayList<String> pieces = new ArrayList<String>();
-        for (List<String> bucket : buckets)
-        {
-            pieces.add("[" + StringUtils.join(bucket, ", ") + "]");
-        }
-        return "{" + StringUtils.join(pieces, ", ") + "}";
-    }
-
-    private String createFile(int nBytes) throws IOException
-    {
-        File f = File.createTempFile("bucket_test", "");
-        FileOutputStream fos = new FileOutputStream(f);
-        byte[] bytes = new byte[nBytes];
-        fos.write(bytes);
-        fos.close();
-        return f.getAbsolutePath();
-    }
-
-    @Test
     public void testGetColumnWithWrongBF() throws IOException, ExecutionException, InterruptedException
     {
         Table table = Table.open("Keyspace1");