You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/02 04:01:04 UTC

svn commit: r810334 - in /incubator/cassandra/branches/cassandra-0.4: src/java/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Wed Sep  2 02:01:03 2009
New Revision: 810334

URL: http://svn.apache.org/viewvc?rev=810334&view=rev
Log:
update non-major compaction logic to use two threshold values: minimum sstables to compact (4) and maximum to compact at once (32).  before, the same threshold (4) was used for both.  the new logic lets the system recover faster from a bulk load that generates sstables faster than compaction can combine them.  patch by Chris Goffinet and jbellis for CASSANDRA-407

Modified:
    incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java
    incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
    incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java

Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep  2 02:01:03 2009
@@ -638,7 +638,7 @@
         /* it's ok if compaction gets submitted multiple times while one is already in process.
            worst that happens is, compactor will count the sstable files and decide there are
            not enough to bother with. */
-        if (ssTableCount >= MinorCompactionManager.COMPACTION_THRESHOLD)
+        if (ssTableCount >= MinorCompactionManager.MINCOMPACTION_THRESHOLD)
         {
             if (logger_.isDebugEnabled())
               logger_.debug("Submitting " + columnFamily_ + " for compaction");
@@ -669,7 +669,7 @@
     /*
      * Group files of similar size into buckets.
      */
-    static Set<List<String>> getCompactionBuckets(List<String> files, long min)
+    static Set<List<String>> getCompactionBuckets(Iterable<String> files, long min)
     {
         Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
         for (String fname : files)
@@ -711,32 +711,17 @@
     /*
      * Break the files into buckets and then compact.
      */
-    int doCompaction(int threshold) throws IOException
+    int doCompaction(int minThreshold, int maxThreshold) throws IOException
     {
-        List<String> files = new ArrayList<String>(ssTables_.keySet());
         int filesCompacted = 0;
-        Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
-        for (List<String> fileList : buckets)
+        for (List<String> files : getCompactionBuckets(ssTables_.keySet(), 50L * 1024L * 1024L))
         {
-            Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
-            if (fileList.size() < threshold)
+            if (files.size() < minThreshold)
             {
                 continue;
             }
-            // For each bucket if it has crossed the threshhold do the compaction
-            // In case of range  compaction merge the counting bloom filters also.
-            files.clear();
-            int count = 0;
-            for (String file : fileList)
-            {
-                files.add(file);
-                count++;
-                if (count == threshold)
-                {
-                    filesCompacted += doFileCompaction(files, BUFSIZE);
-                    break;
-                }
-            }
+            Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+            filesCompacted += doFileCompaction(files.subList(0, Math.min(files.size(), maxThreshold)), BUFSIZE);
         }
         return filesCompacted;
     }

Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed Sep  2 02:01:03 2009
@@ -21,11 +21,8 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -33,7 +30,7 @@
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
+
 import org.apache.log4j.Logger;
 
 class MinorCompactionManager
@@ -42,7 +39,8 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
     private static final long intervalInMins_ = 5;
-    static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+    static final int MINCOMPACTION_THRESHOLD = 4; // compact this many sstables min at a time
+    static final int MAXCOMPACTION_THRESHOLD = 32; // compact this many sstables max at a time
 
     public static MinorCompactionManager instance()
     {
@@ -155,16 +153,16 @@
 
     public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
     {
-        return submit(columnFamilyStore, COMPACTION_THRESHOLD);
+        return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);
     }
 
-    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
+    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
     {
         Callable<Integer> callable = new Callable<Integer>()
         {
             public Integer call() throws IOException
             {
-                return columnFamilyStore.doCompaction(threshold);
+                return columnFamilyStore.doCompaction(minThreshold, maxThreshold);
             }
         };
         return compactor_.submit(callable);

Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java Wed Sep  2 02:01:03 2009
@@ -23,7 +23,6 @@
 import java.util.concurrent.Future;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.Arrays;
 
 import org.junit.Test;
 
@@ -62,7 +61,7 @@
         }
         if (store.getSSTables().size() > 1)
         {
-            store.doCompaction(store.getSSTables().size());
+            store.doCompaction(2, store.getSSTables().size());
         }
         assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
     }

Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java Wed Sep  2 02:01:03 2009
@@ -23,7 +23,6 @@
 import java.util.concurrent.Future;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.Arrays;
 
 import org.junit.Test;
 
@@ -47,7 +46,7 @@
             store.forceBlockingFlush();
             assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
         }
-        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());

Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java Wed Sep  2 02:01:03 2009
@@ -57,7 +57,7 @@
         store.forceBlockingFlush();
         validateRemoveTwoSources();
 
-        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveCompacted();
@@ -109,7 +109,7 @@
         store.forceBlockingFlush();
         validateRemoveWithNewData();
 
-        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveWithNewData();

Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java Wed Sep  2 02:01:03 2009
@@ -304,7 +304,7 @@
         // compact so we have a big row with more than the minimum index count
         if (cfStore.getSSTables().size() > 1)
         {
-            cfStore.doCompaction(cfStore.getSSTables().size());
+            cfStore.doCompaction(2, cfStore.getSSTables().size());
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
         long position = sstable.getPosition(key);