You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/20 16:58:39 UTC

svn commit: r776720 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilyStore.java db/MinorCompactionManager.java io/SSTable.java

Author: jbellis
Date: Wed May 20 14:58:39 2009
New Revision: 776720

URL: http://svn.apache.org/viewvc?rev=776720&view=rev
Log:
more cleanup of compaction code.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-184

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=776720&r1=776719&r2=776720&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 20 14:58:39 2009
@@ -88,9 +88,6 @@
     /* Modification lock used for protecting reads from compactions. */
     private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
 
-    /* Flag indicates if a compaction is in process */
-    private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
-
     private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
     private TimedStatsDeque diskReadStats_ = new TimedStatsDeque(60000);
 
@@ -756,25 +753,26 @@
     */
     void storeLocation(String filename, BloomFilter bf)
     {
-        int ssTableSize = 0;
+        int ssTableCount;
         lock_.writeLock().lock();
         try
         {
             ssTables_.add(filename);
             SSTable.storeBloomFilter(filename, bf);
-            ssTableSize = ssTables_.size();
+            ssTableCount = ssTables_.size();
         }
         finally
         {
             lock_.writeLock().unlock();
         }
 
-        if ((ssTableSize >= MinorCompactionManager.COMPACTION_THRESHOLD && !isCompacting_.get())
-            || (isCompacting_.get() && ssTableSize % MinorCompactionManager.COMPACTION_THRESHOLD == 0))
+        /* it's ok if compaction gets submitted multiple times while one is already in process.
+           worst that happens is, compactor will count the sstable files and decide there are
+           not enough to bother with. */
+        if (ssTableCount >= MinorCompactionManager.COMPACTION_THRESHOLD)
         {
-            logger_.debug("Submitting for  compaction ...");
-            MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
-            logger_.debug("Submitted for compaction ...");
+            logger_.debug("Submitting " + columnFamily_ + " for compaction");
+            MinorCompactionManager.instance().submit(this);
         }
     }
 
@@ -862,45 +860,37 @@
     /*
      * Break the files into buckets and then compact.
      */
-    public int doCompaction(int threshold) throws IOException
+    int doCompaction(int threshold) throws IOException
     {
-        isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
         int filesCompacted = 0;
-        try
+        Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
+        for (List<String> fileList : buckets)
         {
-            Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
-            for (List<String> fileList : buckets)
+            Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
+            if (fileList.size() < threshold)
             {
-                Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
-                if (fileList.size() < threshold)
-                {
-                    continue;
-                }
-                // For each bucket if it has crossed the threshhold do the compaction
-                // In case of range  compaction merge the counting bloom filters also.
-                files.clear();
-                int count = 0;
-                for (String file : fileList)
+                continue;
+            }
+            // For each bucket if it has crossed the threshhold do the compaction
+            // In case of range  compaction merge the counting bloom filters also.
+            files.clear();
+            int count = 0;
+            for (String file : fileList)
+            {
+                files.add(file);
+                count++;
+                if (count == threshold)
                 {
-                    files.add(file);
-                    count++;
-                    if (count == threshold)
-                    {
-                        filesCompacted += doFileCompaction(files, BUFSIZE);
-                        break;
-                    }
+                    filesCompacted += doFileCompaction(files, BUFSIZE);
+                    break;
                 }
             }
         }
-        finally
-        {
-            isCompacting_.set(false);
-        }
         return filesCompacted;
     }
 
-    void doMajorCompaction(long skip)
+    void doMajorCompaction(long skip) throws IOException
     {
         doMajorCompactionInternal(skip);
     }
@@ -911,39 +901,27 @@
      * all files greater than skip GB are skipped for this compaction.
      * Except if skip is 0 , in that case this is ignored and all files are taken.
      */
-    void doMajorCompactionInternal(long skip)
+    void doMajorCompactionInternal(long skip) throws IOException
     {
-        isCompacting_.set(true);
         List<String> filesInternal = new ArrayList<String>(ssTables_);
         List<String> files;
-        try
+        if (skip > 0L)
         {
-            if (skip > 0L)
+            files = new ArrayList<String>();
+            for (String file : filesInternal)
             {
-                files = new ArrayList<String>();
-                for (String file : filesInternal)
+                File f = new File(file);
+                if (f.length() < skip * 1024L * 1024L * 1024L)
                 {
-                    File f = new File(file);
-                    if (f.length() < skip * 1024L * 1024L * 1024L)
-                    {
-                        files.add(file);
-                    }
+                    files.add(file);
                 }
             }
-            else
-            {
-                files = filesInternal;
-            }
-            doFileCompaction(files, BUFSIZE);
         }
-        catch (IOException ex)
-        {
-            logger_.error(ex);
-        }
-        finally
+        else
         {
-            isCompacting_.set(false);
+            files = filesInternal;
         }
+        doFileCompaction(files, BUFSIZE);
     }
 
     /*
@@ -983,19 +961,8 @@
 
     boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
-        isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
-        boolean result = true;
-        try
-        {
-            result = doFileAntiCompaction(files, ranges, target, fileList, null);
-        }
-        finally
-        {
-            isCompacting_.set(false);
-        }
-        return result;
-
+        return doFileAntiCompaction(files, ranges, target, fileList, null);
     }
 
     void forceCleanup()
@@ -1011,18 +978,10 @@
      */
     void doCleanupCompaction() throws IOException
     {
-        isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
-        try
-        {
-            for (String file : files)
-            {
-                doCleanup(file);
-            }
-        }
-        finally
+        for (String file : files)
         {
-            isCompacting_.set(false);
+            doCleanup(file);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=776720&r1=776719&r2=776720&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed May 20 14:58:39 2009
@@ -113,7 +113,14 @@
         public void run()
         {
             logger_.debug("Started  Major compaction for " + columnFamilyStore_.columnFamily_);
-            columnFamilyStore_.doMajorCompaction(skip_);
+            try
+            {
+                columnFamilyStore_.doMajorCompaction(skip_);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
             logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
         }
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=776720&r1=776719&r2=776720&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed May 20 14:58:39 2009
@@ -203,28 +203,28 @@
      * This method deletes both the specified data file
      * and the associated index file
      *
-     * @param dataFile - data file associated with the SSTable
+     * @param dataFileName - data file associated with the SSTable
      */
-    public static void delete(String dataFile)
+    public static void delete(String dataFileName)
     {
         /* remove the cached index table from memory */
-        indexMetadataMap_.remove(dataFile);
+        indexMetadataMap_.remove(dataFileName);
         /* Delete the checksum file associated with this data file */
         try
         {
-            ChecksumManager.onFileDelete(dataFile);
+            ChecksumManager.onFileDelete(dataFileName);
         }
         catch (IOException ex)
         {
             logger_.info(LogUtil.throwableToString(ex));
         }
 
-        File file = new File(dataFile);
-        assert file.exists() : "attempted to delete non-existing file " + dataFile;
+        File file = new File(dataFileName);
+        assert file.exists() : "attempted to delete non-existing file " + dataFileName;
         /* delete the data file */
         if (!file.delete())
         {
-            logger_.error("Failed to delete " + file.getName());
+            logger_.error("Failed to delete " + dataFileName);
         }
     }