You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/20 16:40:27 UTC

svn commit: r776714 - in /incubator/cassandra/branches/cassandra-0.3: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Wed May 20 14:40:27 2009
New Revision: 776714

URL: http://svn.apache.org/viewvc?rev=776714&view=rev
Log:
OneCompactionTest is failing occasionally because 500 keys per CFS is actually triggering an automatic compaction (since test flush threshold is only 20) and we were doing a non-threadsafe doCompaction for convenience: the failure occurs when our manual compaction begins mid-run of an automatic one, and the automatic deletes the original sstable file first.  Fix by (a) dropping the number of keys so that OneCompactionTest lives up to its name (more are tested in "CompactionsTest") and (b) making the compactions call threadsafe by refactoring to allow a threshold parameter to MCM.submit.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-184

Modified:
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 20 14:40:27 2009
@@ -56,7 +56,6 @@
 {
     private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
 
-    private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
     private static final int BUFSIZE = 128 * 1024 * 1024;
     private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
 
@@ -770,8 +769,8 @@
             lock_.writeLock().unlock();
         }
 
-        if ((ssTableSize >= COMPACTION_THRESHOLD && !isCompacting_.get())
-            || (isCompacting_.get() && ssTableSize % COMPACTION_THRESHOLD == 0))
+        if ((ssTableSize >= MinorCompactionManager.COMPACTION_THRESHOLD && !isCompacting_.get())
+            || (isCompacting_.get() && ssTableSize % MinorCompactionManager.COMPACTION_THRESHOLD == 0))
         {
             logger_.debug("Submitting for  compaction ...");
             MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
@@ -860,11 +859,6 @@
         return buckets.keySet();
     }
 
-    public int doCompaction() throws IOException
-    {
-        return doCompaction(COMPACTION_THRESHOLD);
-    }
-
     /*
      * Break the files into buckets and then compact.
      */
@@ -1294,6 +1288,7 @@
     */
     private int doFileCompaction(List<String> files, int minBufferSize) throws IOException
     {
+        logger_.info("Compacting [" + StringUtils.join(files, ",") + "]");
         String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
         // If the compaction file path is null that means we have no space left for this compaction.
         // try again w/o the largest one.
@@ -1314,6 +1309,7 @@
 
         if (pq.isEmpty())
         {
+            logger_.warn("Nothing to compact (all files empty or corrupt)");
             // TODO clean out bad files, if any
             return 0;
         }

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed May 20 14:40:27 2009
@@ -46,7 +46,8 @@
     private static MinorCompactionManager instance_;
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
-    final static long intervalInMins_ = 5;
+    private static final long intervalInMins_ = 5;
+    static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
 
     public static MinorCompactionManager instance()
     {
@@ -66,33 +67,6 @@
         return instance_;
     }
 
-    class FileCompactor implements Callable<Integer>
-    {
-        private ColumnFamilyStore columnFamilyStore_;
-
-        FileCompactor(ColumnFamilyStore columnFamilyStore)
-        {
-            columnFamilyStore_ = columnFamilyStore;
-        }
-
-        public Integer call()
-        {
-            logger_.debug("Started compaction ..." + columnFamilyStore_.columnFamily_);
-            try
-            {
-                return columnFamilyStore_.doCompaction();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            finally
-            {
-                logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
-            }
-        }
-    }
-
     class FileCompactor2 implements Callable<Boolean>
     {
         private ColumnFamilyStore columnFamilyStore_;
@@ -138,9 +112,9 @@
 
         public void run()
         {
-            logger_.debug("Started  Major compaction ..."+columnFamilyStore_.columnFamily_);
+            logger_.debug("Started  Major compaction for " + columnFamilyStore_.columnFamily_);
             columnFamilyStore_.doMajorCompaction(skip_);
-            logger_.debug("Finished Major compaction ..."+columnFamilyStore_.columnFamily_);
+            logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
         }
     }
 
@@ -183,22 +157,41 @@
 
     public void submitPeriodicCompaction(final ColumnFamilyStore columnFamilyStore)
     {
-        Runnable runnable = new Runnable() // having to wrap Callable in Runnable is retarded but that's what the API insists on.
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
-                new FileCompactor(columnFamilyStore).call();
+                try
+                {
+                    columnFamilyStore.doCompaction(COMPACTION_THRESHOLD);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
         };
     	compactor_.scheduleWithFixedDelay(runnable, MinorCompactionManager.intervalInMins_,
     			MinorCompactionManager.intervalInMins_, TimeUnit.MINUTES);       
     }
 
-    public Future<Integer> submit(ColumnFamilyStore columnFamilyStore)
+    public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
     {
-        return compactor_.submit(new FileCompactor(columnFamilyStore));
+        return submit(columnFamilyStore, COMPACTION_THRESHOLD);
     }
-    
+
+    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
+    {
+        Callable<Integer> callable = new Callable<Integer>()
+        {
+            public Integer call() throws IOException
+            {
+                return columnFamilyStore.doCompaction(threshold);
+            }
+        };
+        return compactor_.submit(callable);
+    }
+
     public void submitCleanup(ColumnFamilyStore columnFamilyStore)
     {
         compactor_.submit(new CleanupCompactor(columnFamilyStore));

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java Wed May 20 14:40:27 2009
@@ -220,7 +220,7 @@
         }
 
         File file = new File(dataFile);
-        assert file.exists();
+        assert file.exists() : "attempted to delete non-existing file " + dataFile;
         /* delete the data file */
         if (!file.delete())
         {

Modified: incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=776714&r1=776713&r2=776714&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/test/unit/org/apache/cassandra/db/OneCompactionTest.java Wed May 20 14:40:27 2009
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.Set;
 import java.util.HashSet;
 
@@ -44,7 +45,9 @@
             store.forceBlockingFlush();
             assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
         }
-        store.doCompaction(2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        ft.get();
+        assertEquals(1, store.getSSTableFilenames().size());
         assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
     }
 
@@ -57,6 +60,6 @@
     @Test
     public void testCompaction2() throws IOException, ExecutionException, InterruptedException
     {
-        testCompaction("Standard2", 500);
+        testCompaction("Standard2", 5);
     }
 }