You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/02 04:02:31 UTC

svn commit: r810335 - in /incubator/cassandra/trunk: ./ interface/gen-java/org/apache/cassandra/service/ src/java/org/ src/java/org/apache/cassandra/db/ test/unit/org/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Wed Sep  2 02:02:30 2009
New Revision: 810335

URL: http://svn.apache.org/viewvc?rev=810335&view=rev
Log:
merge from 0.4 branch

Modified:
    incubator/cassandra/trunk/   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java   (props changed)
    incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java   (props changed)
    incubator/cassandra/trunk/src/java/org/   (props changed)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/test/unit/org/   (props changed)
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java

Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,2 +1,2 @@
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
-/incubator/cassandra/branches/cassandra-0.4:810145-810281
+/incubator/cassandra/branches/cassandra-0.4:810145-810334

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-810334
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,4 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-810281
-/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-810334
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-810334
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-810334
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588

Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,4 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-810281
-/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-810334
 /incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588

Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-810334
 /incubator/cassandra/trunk/src/java/org:749219-769885

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep  2 02:02:30 2009
@@ -175,16 +175,17 @@
         for (File file : sstableFiles)
         {
             String filename = file.getAbsolutePath();
+            SSTableReader sstable;
             try
             {
-                SSTableReader sstable = SSTableReader.open(filename);
-                ssTables_.put(filename, sstable);
+                sstable = SSTableReader.open(filename);
             }
             catch (IOException ex)
             {
-                logger_.error("Corrupt file " + filename, ex);
-                FileUtils.delete(filename);
+                logger_.error("Corrupt file " + filename + "; skipped", ex);
+                continue;
             }
+            ssTables_.put(filename, sstable);
         }
 
         // submit initial check-for-compaction request
@@ -637,7 +638,7 @@
         /* it's ok if compaction gets submitted multiple times while one is already in process.
            worst that happens is, compactor will count the sstable files and decide there are
            not enough to bother with. */
-        if (ssTableCount >= MinorCompactionManager.COMPACTION_THRESHOLD)
+        if (ssTableCount >= MinorCompactionManager.MINCOMPACTION_THRESHOLD)
         {
             if (logger_.isDebugEnabled())
               logger_.debug("Submitting " + columnFamily_ + " for compaction");
@@ -668,7 +669,7 @@
     /*
      * Group files of similar size into buckets.
      */
-    static Set<List<String>> getCompactionBuckets(List<String> files, long min)
+    static Set<List<String>> getCompactionBuckets(Iterable<String> files, long min)
     {
         Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
         for (String fname : files)
@@ -710,32 +711,17 @@
     /*
      * Break the files into buckets and then compact.
      */
-    int doCompaction(int threshold) throws IOException
+    int doCompaction(int minThreshold, int maxThreshold) throws IOException
     {
-        List<String> files = new ArrayList<String>(ssTables_.keySet());
         int filesCompacted = 0;
-        Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
-        for (List<String> fileList : buckets)
+        for (List<String> files : getCompactionBuckets(ssTables_.keySet(), 50L * 1024L * 1024L))
         {
-            Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
-            if (fileList.size() < threshold)
+            if (files.size() < minThreshold)
             {
                 continue;
             }
-            // For each bucket if it has crossed the threshhold do the compaction
-            // In case of range  compaction merge the counting bloom filters also.
-            files.clear();
-            int count = 0;
-            for (String file : fileList)
-            {
-                files.add(file);
-                count++;
-                if (count == threshold)
-                {
-                    filesCompacted += doFileCompaction(files, BUFSIZE);
-                    break;
-                }
-            }
+            Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+            filesCompacted += doFileCompaction(files.subList(0, Math.min(files.size(), maxThreshold)), BUFSIZE);
         }
         return filesCompacted;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed Sep  2 02:02:30 2009
@@ -21,11 +21,8 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -33,7 +30,7 @@
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
+
 import org.apache.log4j.Logger;
 
 class MinorCompactionManager
@@ -42,7 +39,8 @@
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
     private static final long intervalInMins_ = 5;
-    static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+    static final int MINCOMPACTION_THRESHOLD = 4; // compact this many sstables min at a time
+    static final int MAXCOMPACTION_THRESHOLD = 32; // compact this many sstables max at a time
 
     public static MinorCompactionManager instance()
     {
@@ -155,16 +153,16 @@
 
     public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
     {
-        return submit(columnFamilyStore, COMPACTION_THRESHOLD);
+        return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);
     }
 
-    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
+    Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
     {
         Callable<Integer> callable = new Callable<Integer>()
         {
             public Integer call() throws IOException
             {
-                return columnFamilyStore.doCompaction(threshold);
+                return columnFamilyStore.doCompaction(minThreshold, maxThreshold);
             }
         };
         return compactor_.submit(callable);

Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  2 02:02:30 2009
@@ -1,3 +1,3 @@
 /incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-810334
 /incubator/cassandra/trunk/test/unit/org:749219-768583

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Wed Sep  2 02:02:30 2009
@@ -23,7 +23,6 @@
 import java.util.concurrent.Future;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.Arrays;
 
 import org.junit.Test;
 
@@ -62,7 +61,7 @@
         }
         if (store.getSSTables().size() > 1)
         {
-            store.doCompaction(store.getSSTables().size());
+            store.doCompaction(2, store.getSSTables().size());
         }
         assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
     }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Wed Sep  2 02:02:30 2009
@@ -23,7 +23,6 @@
 import java.util.concurrent.Future;
 import java.util.Set;
 import java.util.HashSet;
-import java.util.Arrays;
 
 import org.junit.Test;
 
@@ -47,7 +46,7 @@
             store.forceBlockingFlush();
             assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
         }
-        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java Wed Sep  2 02:02:30 2009
@@ -57,7 +57,7 @@
         store.forceBlockingFlush();
         validateRemoveTwoSources();
 
-        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveCompacted();
@@ -109,7 +109,7 @@
         store.forceBlockingFlush();
         validateRemoveWithNewData();
 
-        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+        Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
         ft.get();
         assertEquals(1, store.getSSTables().size());
         validateRemoveWithNewData();

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Wed Sep  2 02:02:30 2009
@@ -304,7 +304,7 @@
         // compact so we have a big row with more than the minimum index count
         if (cfStore.getSSTables().size() > 1)
         {
-            cfStore.doCompaction(cfStore.getSSTables().size());
+            cfStore.doCompaction(2, cfStore.getSSTables().size());
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
         long position = sstable.getPosition(key);