You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/11 00:13:55 UTC

svn commit: r1134460 [2/2] - in /cassandra/trunk: ./ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apa...

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jun 10 22:13:54 2011
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,7 +33,6 @@ import javax.management.ObjectName;
 import org.apache.commons.collections.PredicateUtils;
 import org.apache.commons.collections.iterators.CollatingIterator;
 import org.apache.commons.collections.iterators.FilterIterator;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +49,6 @@ import org.apache.cassandra.service.Anti
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
  * A singleton which manages a private executor of ongoing compactions. A readwrite lock
@@ -85,7 +82,6 @@ public class CompactionManager implement
     }
 
     private CompactionExecutor executor = new CompactionExecutor();
-    private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
 
     /**
      * @return A lock, for which acquisition means no compactions can run.
@@ -111,37 +107,20 @@ public class CompactionManager implement
                 {
                     if (cfs.isInvalid())
                         return 0;
-                    Integer minThreshold = cfs.getMinimumCompactionThreshold();
-                    Integer maxThreshold = cfs.getMaximumCompactionThreshold();
-    
-                    if (minThreshold == 0 || maxThreshold == 0)
-                    {
-                        logger.debug("Compaction is currently disabled.");
-                        return 0;
-                    }
-                    logger.debug("Checking to see if compaction of " + cfs.columnFamily + " would be useful");
-                    Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
-                    updateEstimateFor(cfs, buckets);
-                    int gcBefore = getDefaultGcBefore(cfs);
-                    
-                    for (List<SSTableReader> sstables : buckets)
+
+                    AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+                    for (AbstractCompactionTask task : strategy.getBackgroundTasks(getDefaultGcBefore(cfs)))
                     {
-                        if (sstables.size() < minThreshold)
-                            continue;
-                        // if we have too many to compact all at once, compact older ones first -- this avoids
-                        // re-compacting files we just created.
-                        Collections.sort(sstables);
-                        Collection<SSTableReader> tocompact = cfs.getDataTracker().markCompacting(sstables, minThreshold, maxThreshold);
-                        if (tocompact == null)
-                            // enough threads are busy in this bucket
+                        if (!cfs.getDataTracker().markCompacting(task))
                             continue;
+
                         try
                         {
-                            return doCompaction(cfs, tocompact, gcBefore);
+                            task.execute(executor);
                         }
                         finally
                         {
-                            cfs.getDataTracker().unmarkCompacting(tocompact);
+                            cfs.getDataTracker().unmarkCompacting(task);
                         }
                     }
                 }
@@ -155,29 +134,6 @@ public class CompactionManager implement
         return executor.submit(callable);
     }
 
-    private void updateEstimateFor(ColumnFamilyStore cfs, Set<List<SSTableReader>> buckets)
-    {
-        Integer minThreshold = cfs.getMinimumCompactionThreshold();
-        Integer maxThreshold = cfs.getMaximumCompactionThreshold();
-
-        if (minThreshold > 0 && maxThreshold > 0)
-        {
-            int n = 0;
-            for (List<SSTableReader> sstables : buckets)
-            {
-                if (sstables.size() >= minThreshold)
-                {
-                    n += Math.ceil((double)sstables.size() / maxThreshold);
-                }
-            }
-            estimatedCompactions.put(cfs, n);
-        }
-        else
-        {
-            logger.debug("Compaction is currently disabled.");
-        }
-    }
-
     public void performCleanup(final ColumnFamilyStore cfStore, final NodeId.OneShotRenewer renewer) throws InterruptedException, ExecutionException
     {
         Callable<Object> runnable = new Callable<Object>()
@@ -273,10 +229,10 @@ public class CompactionManager implement
 
     public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
     {
-        submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
+        submitMajor(cfStore, getDefaultGcBefore(cfStore)).get();
     }
 
-    public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
+    public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final int gcBefore)
     {
         Callable<Object> callable = new Callable<Object>()
         {
@@ -288,45 +244,30 @@ public class CompactionManager implement
                 {
                     if (cfStore.isInvalid())
                         return this;
-                    Collection<SSTableReader> sstables;
-                    if (skip > 0)
+                    AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
+                    for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
                     {
-                        sstables = new ArrayList<SSTableReader>();
-                        for (SSTableReader sstable : cfStore.getSSTables())
+                        if (!cfStore.getDataTracker().markCompacting(task, 0, Integer.MAX_VALUE))
+                            return this;
+                        try
                         {
-                            if (sstable.length() < skip * 1024L * 1024L * 1024L)
+                            // downgrade the lock acquisition
+                            compactionLock.readLock().lock();
+                            compactionLock.writeLock().unlock();
+                            try
                             {
-                                sstables.add(sstable);
+                                return task.execute(executor);
+                            }
+                            finally
+                            {
+                                compactionLock.readLock().unlock();
                             }
-                        }
-                    }
-                    else
-                    {
-                        sstables = cfStore.getSSTables();
-                    }
-
-                    Collection<SSTableReader> tocompact = cfStore.getDataTracker().markCompacting(sstables, 0, Integer.MAX_VALUE);
-                    if (tocompact == null || tocompact.isEmpty())
-                        return this;
-                    try
-                    {
-                        // downgrade the lock acquisition
-                        compactionLock.readLock().lock();
-                        compactionLock.writeLock().unlock();
-                        try
-                        {
-                            doCompaction(cfStore, tocompact, gcBefore);
                         }
                         finally
                         {
-                            compactionLock.readLock().unlock();
+                            cfStore.getDataTracker().unmarkCompacting(task);
                         }
                     }
-                    finally
-                    {
-                        cfStore.getDataTracker().unmarkCompacting(tocompact);
-                    }
-                    return this;
                 }
                 finally
                 {
@@ -334,6 +275,7 @@ public class CompactionManager implement
                     if (compactionLock.writeLock().isHeldByCurrentThread())
                         compactionLock.writeLock().unlock();
                 }
+                return this;
             }
         };
         return executor.submit(callable);
@@ -408,11 +350,12 @@ public class CompactionManager implement
                     // attempt to schedule the set
                     else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
                     {
-                        String location = cfs.table.getDataFileLocation(1);
                         // success: perform the compaction
                         try
                         {
-                            doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location);
+                            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+                            AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore);
+                            task.execute(executor);
                         }
                         finally
                         {
@@ -484,141 +427,6 @@ public class CompactionManager implement
         }
     }
 
-    int doCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) throws IOException
-    {
-        if (sstables.size() < 2)
-        {
-            logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "; use forceUserDefinedCompaction if you wish to force compaction of single sstables (e.g. for tombstone collection)");
-            return 0;
-        }
-
-        Table table = cfs.table;
-
-        // If the compaction file path is null that means we have no space left for this compaction.
-        // try again w/o the largest one.
-        Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables);
-        while (smallerSSTables.size() > 1)
-        {
-            String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
-            if (compactionFileLocation != null)
-                return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore, compactionFileLocation);
-
-            logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
-            smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
-        }
-
-        logger.error("insufficient space to compact even the two smallest files, aborting");
-        return 0;
-    }
-
-    /**
-     * For internal use and testing only.  The rest of the system should go through the submit* methods,
-     * which are properly serialized.
-     */
-    int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, String compactionFileLocation) throws IOException
-    {
-        // The collection of sstables passed may be empty (but not null); even if
-        // it is not empty, it may compact down to nothing if all rows are deleted.
-        assert sstables != null;
-
-        Table table = cfs.table;
-        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
-            table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
-
-        // sanity check: all sstables must belong to the same cfs
-        for (SSTableReader sstable : sstables)
-            assert sstable.descriptor.cfname.equals(cfs.columnFamily);
-
-        // compaction won't normally compact a single sstable, so if that's what we're doing
-        // it must have been requested manually by the user, which probably means he wants to force
-        // tombstone purge, which won't happen unless we force deserializing the rows.
-        boolean forceDeserialize = sstables.size() == 1;
-        CompactionController controller = new CompactionController(cfs, sstables, gcBefore, forceDeserialize);
-        // new sstables from flush can be added during a compaction, but only the compaction can remove them,
-        // so in our single-threaded compaction world this is a valid way of determining if we're compacting
-        // all the sstables (that existed when we started)
-        CompactionType type = controller.isMajor()
-                            ? CompactionType.MAJOR
-                            : CompactionType.MINOR;
-        logger.info("Compacting {}: {}", type, sstables);
-
-        long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
-
-        // TODO the int cast here is potentially buggy
-        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
-        if (logger.isDebugEnabled())
-          logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-
-        SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
-        Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
-        Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
-
-        executor.beginCompaction(ci);
-        try
-        {
-            if (!nni.hasNext())
-            {
-                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
-                // we need to sync it (via closeAndOpen) first, so there is no period during which
-                // a crash could cause data loss.
-                cfs.markCompacted(sstables);
-                return 0;
-            }
-
-            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
-            while (nni.hasNext())
-            {
-                AbstractCompactedRow row = nni.next();
-                long position = writer.append(row);
-                totalkeysWritten++;
-
-                if (DatabaseDescriptor.getPreheatKeyCache())
-                {
-                    for (SSTableReader sstable : sstables)
-                    {
-                        if (sstable.getCachedPosition(row.key) != null)
-                        {
-                            cachedKeys.put(row.key, position);
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-        finally
-        {
-            ci.close();
-            executor.finishCompaction(ci);
-        }
-
-        SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
-        cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
-        for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
-            ssTable.cacheKey(entry.getKey(), entry.getValue());
-        submitMinorIfNeeded(cfs);
-
-        long dTime = System.currentTimeMillis() - startTime;
-        long startsize = SSTable.getTotalBytes(sstables);
-        long endsize = ssTable.length();
-        double ratio = (double)endsize / (double)startsize;
-        logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.",
-                                  writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
-        return sstables.size();
-    }
-
-    private static long getMaxDataAge(Collection<SSTableReader> sstables)
-    {
-        long max = 0;
-        for (SSTableReader sstable : sstables)
-        {
-            if (sstable.maxDataAge > max)
-                max = sstable.maxDataAge;
-        }
-        return max;
-    }
-
     /**
      * Deserialize everything in the CFS and re-serialize w/ the newest version.  Also attempts to recover
      * from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
@@ -975,70 +783,6 @@ public class CompactionManager implement
         }
     }
 
-    /*
-    * Group files of similar size into buckets.
-    */
-    static <T> Set<List<T>> getBuckets(Collection<Pair<T, Long>> files, long min)
-    {
-        // Sort the list in order to get deterministic results during the grouping below
-        List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
-        Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>()
-        {
-            public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
-            {
-                return p1.right.compareTo(p2.right);
-            }
-        });
-
-        Map<List<T>, Long> buckets = new HashMap<List<T>, Long>();
-
-        for (Pair<T, Long> pair: sortedFiles)
-        {
-            long size = pair.right;
-
-            boolean bFound = false;
-            // look for a bucket containing similar-sized files:
-            // group in the same bucket if it's w/in 50% of the average for this bucket,
-            // or this file and the bucket are all considered "small" (less than `min`)
-            for (Entry<List<T>, Long> entry : buckets.entrySet())
-            {
-                List<T> bucket = entry.getKey();
-                long averageSize = entry.getValue();
-                if ((size > (averageSize / 2) && size < (3 * averageSize) / 2)
-                    || (size < min && averageSize < min))
-                {
-                    // remove and re-add because adding changes the hash
-                    buckets.remove(bucket);
-                    long totalSize = bucket.size() * averageSize;
-                    averageSize = (totalSize + size) / (bucket.size() + 1);
-                    bucket.add(pair.left);
-                    buckets.put(bucket, averageSize);
-                    bFound = true;
-                    break;
-                }
-            }
-            // no similar bucket found; put it in a new one
-            if (!bFound)
-            {
-                ArrayList<T> bucket = new ArrayList<T>();
-                bucket.add(pair.left);
-                buckets.put(bucket, size);
-            }
-        }
-
-        return buckets.keySet();
-    }
-
-    private static Collection<Pair<SSTableReader, Long>> convertSSTablesToPairs(Collection<SSTableReader> collection)
-    {
-        Collection<Pair<SSTableReader, Long>> tablePairs = new ArrayList<Pair<SSTableReader, Long>>();
-        for(SSTableReader table: collection)
-        {
-            tablePairs.add(new Pair<SSTableReader, Long>(table, table.length()));
-        }
-        return tablePairs;
-    }
-    
     /**
      * Is not scheduled, because it is performing disjoint work from sstable compaction.
      */
@@ -1169,7 +913,7 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
-    private static int getDefaultGcBefore(ColumnFamilyStore cfs)
+    static int getDefaultGcBefore(ColumnFamilyStore cfs)
     {
         return cfs.isIndex()
                ? Integer.MAX_VALUE
@@ -1201,7 +945,7 @@ public class CompactionManager implement
         return executor.getActiveCount();
     }
 
-    private static class CompactionExecutor extends DebuggableThreadPoolExecutor
+    private static class CompactionExecutor extends DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector
     {
         // a synchronized identity set of running tasks to their compaction info
         private final Set<CompactionInfo.Holder> compactions;
@@ -1222,12 +966,12 @@ public class CompactionManager implement
             return Math.max(1, DatabaseDescriptor.getConcurrentCompactors());
         }
 
-        void beginCompaction(CompactionInfo.Holder ci)
+        public void beginCompaction(CompactionInfo.Holder ci)
         {
             compactions.add(ci);
         }
 
-        void finishCompaction(CompactionInfo.Holder ci)
+        public void finishCompaction(CompactionInfo.Holder ci)
         {
             compactions.remove(ci);
         }
@@ -1238,6 +982,12 @@ public class CompactionManager implement
         }
     }
 
+    public interface CompactionExecutorStatsCollector
+    {
+        void beginCompaction(CompactionInfo.Holder ci);
+        void finishCompaction(CompactionInfo.Holder ci);
+    }
+
     public List<CompactionInfo> getCompactions()
     {
         List<CompactionInfo> out = new ArrayList<CompactionInfo>();
@@ -1257,8 +1007,13 @@ public class CompactionManager implement
     public int getPendingTasks()
     {
         int n = 0;
-        for (Integer i : estimatedCompactions.values())
-            n += i;
+        for (String tableName : DatabaseDescriptor.getTables())
+        {
+            for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores())
+            {
+                n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
+            }
+        }
         return (int) (executor.getTaskCount() - executor.getCompletedTaskCount()) + n;
     }
 

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1134460&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Jun 10 22:13:54 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
+public class CompactionTask extends AbstractCompactionTask
+{
+    private static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
+    protected String compactionFileLocation;
+    protected final int gcBefore;
+    protected boolean isUserDefined;
+
+    public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore)
+    {
+        super(cfs, sstables);
+        compactionFileLocation = null;
+        this.gcBefore = gcBefore;
+        this.isUserDefined = false;
+    }
+
+    /**
+     * For internal use and testing only.  The rest of the system should go through the submit* methods,
+     * which are properly serialized.
+     */
+    public int execute(CompactionExecutorStatsCollector collector) throws IOException
+    {
+        if (!isUserDefined)
+        {
+            if (sstables.size() < 2)
+            {
+                logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." +
+                            "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " +
+                            "(e.g. for tombstone collection)");
+                return 0;
+            }
+
+            if (compactionFileLocation == null)
+                compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
+
+            // If the compaction file path is null that means we have no space left for this compaction.
+            // Try again w/o the largest one.
+            if (compactionFileLocation == null)
+            {
+                Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables);
+                while (compactionFileLocation == null && smallerSSTables.size() > 1)
+                {
+                    logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
+                    smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
+                    compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
+                }
+            }
+
+            if (compactionFileLocation == null)
+            {
+                logger.warn("insufficient space to compact even the two smallest files, aborting");
+                return 0;
+            }
+        }
+
+        // The collection of sstables passed may be empty (but not null); even if
+        // it is not empty, it may compact down to nothing if all rows are deleted.
+        assert sstables != null;
+
+        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
+            cfs.table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
+
+        // sanity check: all sstables must belong to the same cfs
+        for (SSTableReader sstable : sstables)
+            assert sstable.descriptor.cfname.equals(cfs.columnFamily);
+
+        CompactionController controller = new CompactionController(cfs, sstables, gcBefore, isUserDefined);
+        // new sstables from flush can be added during a compaction, but only the compaction can remove them,
+        // so in our single-threaded compaction world this is a valid way of determining if we're compacting
+        // all the sstables (that existed when we started)
+        CompactionType type = controller.isMajor()
+                            ? CompactionType.MAJOR
+                            : CompactionType.MINOR;
+        logger.info("Compacting {}: {}", type, sstables);
+
+        long startTime = System.currentTimeMillis();
+        long totalkeysWritten = 0;
+
+        // TODO the int cast here is potentially buggy
+        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
+        if (logger.isDebugEnabled())
+            logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+        SSTableWriter writer;
+        CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
+        Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
+        Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
+
+        if (collector != null)
+            collector.beginCompaction(ci);
+        try
+        {
+            if (!nni.hasNext())
+            {
+                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+                // we need to sync it (via closeAndOpen) first, so there is no period during which
+                // a crash could cause data loss.
+                cfs.markCompacted(sstables);
+                return 0;
+            }
+
+            writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
+            while (nni.hasNext())
+            {
+                AbstractCompactedRow row = nni.next();
+                long position = writer.append(row);
+                totalkeysWritten++;
+
+                if (DatabaseDescriptor.getPreheatKeyCache())
+                {
+                    for (SSTableReader sstable : sstables)
+                    {
+                        if (sstable.getCachedPosition(row.key) != null)
+                        {
+                            cachedKeys.put(row.key, position);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        finally
+        {
+            ci.close();
+            if (collector != null)
+                collector.finishCompaction(ci);
+        }
+
+        SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
+        cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
+        for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
+            ssTable.cacheKey(entry.getKey(), entry.getValue());
+        CompactionManager.instance.submitMinorIfNeeded(cfs);
+
+        long dTime = System.currentTimeMillis() - startTime;
+        long startsize = SSTable.getTotalBytes(sstables);
+        long endsize = ssTable.length();
+        double ratio = (double)endsize / (double)startsize;
+        logger.info(String.format("Compacted to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.",
+                writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+        return sstables.size();
+    }
+
+    public static long getMaxDataAge(Collection<SSTableReader> sstables)
+    {
+        long max = 0;
+        for (SSTableReader sstable : sstables)
+        {
+            if (sstable.maxDataAge > max)
+                max = sstable.maxDataAge;
+        }
+        return max;
+    }
+
+    public CompactionTask compactionFileLocation(String compactionFileLocation)
+    {
+        this.compactionFileLocation = compactionFileLocation;
+        return this;
+    }
+
+    public CompactionTask isUserDefined(boolean isUserDefined)
+    {
+        this.isUserDefined = isUserDefined;
+        return this;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1134460&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java Fri Jun 10 22:13:54 2011
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+
+public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
+{
+    private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
+    protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
+    protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
+    protected static long minSSTableSize;
+    protected volatile int estimatedRemainingTasks;
+
+    public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+    {
+       super(cfs, options);
+       this.estimatedRemainingTasks = 0;
+       String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
+       minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE;
+    }
+
+    public List<AbstractCompactionTask> getBackgroundTasks(final int gcBefore)
+    {
+        if (cfs.isCompactionDisabled())
+        {
+            logger.debug("Compaction is currently disabled.");
+            return Collections.<AbstractCompactionTask>emptyList();
+        }
+
+        List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>();
+        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()), minSSTableSize);
+
+        for (List<SSTableReader> bucket : buckets)
+        {
+            if (bucket.size() < cfs.getMinimumCompactionThreshold())
+                continue;
+
+            Collections.sort(bucket);
+            tasks.add(new CompactionTask(cfs, bucket.subList(0, Math.min(bucket.size(), cfs.getMaximumCompactionThreshold())), gcBefore));
+        }
+
+        updateEstimatedCompactionsByTasks(tasks);
+        return tasks;
+    }
+
+    public List<AbstractCompactionTask> getMaximalTasks(final int gcBefore)
+    {
+        List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>();
+        if (!cfs.getSSTables().isEmpty())
+            tasks.add(new CompactionTask(cfs, cfs.getSSTables(), gcBefore));
+        return tasks;
+    }
+
+    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
+    {
+        return new CompactionTask(cfs, sstables, gcBefore)
+                .isUserDefined(true)
+                .compactionFileLocation(cfs.table.getDataFileLocation(1));
+    }
+
+    public int getEstimatedRemainingTasks()
+    {
+        return estimatedRemainingTasks;
+    }
+
+    private static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Collection<SSTableReader> collection)
+    {
+        List<Pair<SSTableReader, Long>> tableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>();
+        for(SSTableReader table: collection)
+            tableLengthPairs.add(new Pair<SSTableReader, Long>(table, table.length()));
+        return tableLengthPairs;
+    }
+
+    /*
+     * Group files of similar size into buckets.
+     */
+    <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize)
+    {
+        // Sort the list in order to get deterministic results during the grouping below
+        List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
+        Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>()
+        {
+            public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
+            {
+                return p1.right.compareTo(p2.right);
+            }
+        });
+
+        Map<List<T>, Long> buckets = new HashMap<List<T>, Long>();
+
+        for (Pair<T, Long> pair: sortedFiles)
+        {
+            long size = pair.right;
+
+            boolean bFound = false;
+            // look for a bucket containing similar-sized files:
+            // group in the same bucket if it's w/in 50% of the average for this bucket,
+            // or this file and the bucket are all considered "small" (less than `minSSTableSize`)
+            for (Entry<List<T>, Long> entry : buckets.entrySet())
+            {
+                List<T> bucket = entry.getKey();
+                long averageSize = entry.getValue();
+                if ((size > (averageSize / 2) && size < (3 * averageSize) / 2)
+                    || (size < minSSTableSize && averageSize < minSSTableSize))
+                {
+                    // remove and re-add because adding changes the hash
+                    buckets.remove(bucket);
+                    long totalSize = bucket.size() * averageSize;
+                    averageSize = (totalSize + size) / (bucket.size() + 1);
+                    bucket.add(pair.left);
+                    buckets.put(bucket, averageSize);
+                    bFound = true;
+                    break;
+                }
+            }
+            // no similar bucket found; put it in a new one
+            if (!bFound)
+            {
+                ArrayList<T> bucket = new ArrayList<T>();
+                bucket.add(pair.left);
+                buckets.put(bucket, size);
+            }
+        }
+
+        return new LinkedList<List<T>>(buckets.keySet());
+    }
+
+    private void updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks)
+    {
+        int n = 0;
+        for (AbstractCompactionTask task: tasks)
+        {
+            if (!(task instanceof CompactionTask))
+                continue;
+
+            Collection<SSTableReader> sstablesToBeCompacted = task.getSSTables();
+            if (sstablesToBeCompacted.size() >= cfs.getMinimumCompactionThreshold())
+                n += Math.ceil((double)sstablesToBeCompacted.size() / cfs.getMaximumCompactionThreshold());
+        }
+        estimatedRemainingTasks = n;
+    }
+
+    public long getMinSSTableSize()
+    {
+        return minSSTableSize;
+    }
+
+    public String toString()
+    {
+        return String.format("SizeTieredCompactionStrategy[%s/%s]",
+            cfs.getMinimumCompactionThreshold(),
+            cfs.getMaximumCompactionThreshold());
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Jun 10 22:13:54 2011
@@ -39,7 +39,6 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;

Modified: cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java (original)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java Fri Jun 10 22:13:54 2011
@@ -131,7 +131,8 @@ public class LongCompactionSpeedTest ext
         Thread.sleep(1000);
 
         long start = System.currentTimeMillis();
-        CompactionManager.instance.doCompaction(store, sstables, (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getCFMetaData(TABLE1, "Standard1").getGcGraceSeconds());
+        final int gcBefore = (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getCFMetaData(TABLE1, "Standard1").getGcGraceSeconds();
+        new CompactionTask(store, sstables, gcBefore).execute(null);
         System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
                                          this.getClass().getName(),
                                          sstableCount,

Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Fri Jun 10 22:13:54 2011
@@ -144,6 +144,8 @@ public class CliTest extends CleanupHelp
         "create column family Countries with comparator=UTF8Type and column_metadata=[ {column_name: name, validation_class: UTF8Type} ];",
         "set Countries[1][name] = USA;",
         "get Countries[1][name];",
+        "update column family Countries with compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy';",
+        "create column family Cities with compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' and compaction_strategy_options = [{min_sstable_size:1024}];",
         "set myCF['key']['scName']['firstname'] = 'John';",
         "get myCF['key']['scName']",
         "assume CF3 keys as utf8;",
@@ -264,4 +266,4 @@ public class CliTest extends CleanupHelp
         assertEquals(unescaped, CliUtils.unescapeSQLString("'" + escaped + "'"));
         assertEquals(escaped, CliUtils.escapeSQLString(unescaped));
     }
-}
\ No newline at end of file
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Fri Jun 10 22:13:54 2011
@@ -77,7 +77,7 @@ public class DefsTest extends CleanupHel
         assert cd2.min_compaction_threshold == null;
         assert cd.row_cache_save_period_in_seconds == null;
         assert cd2.row_cache_save_period_in_seconds == null;
-        
+        assert cd.compaction_strategy == null;
     }
     
     @Test

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Fri Jun 10 22:13:54 2011
@@ -19,9 +19,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.Test;
@@ -35,7 +33,6 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.Util;
 
@@ -86,7 +83,7 @@ public class CompactionsPurgeTest extend
         cfs.forceBlockingFlush();
 
         // major compact and test that all columns but the resurrected one is completely gone
-        CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get();
+        CompactionManager.instance.submitMajor(cfs, Integer.MAX_VALUE).get();
         cfs.invalidateCachedRow(key);
         ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
         assertColumns(cf, "5");
@@ -136,7 +133,7 @@ public class CompactionsPurgeTest extend
         rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
         rm.apply();
         cfs.forceBlockingFlush();
-        CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE);
+        new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null);
 
         // verify that minor compaction does not GC when key is present
         // in a non-compacted sstable

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Fri Jun 10 22:13:54 2011
@@ -19,38 +19,29 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.Set;
 import java.util.HashSet;
-
-import org.apache.cassandra.Util;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.junit.Test;
+import static junit.framework.Assert.assertEquals;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import static junit.framework.Assert.assertEquals;
 
 public class CompactionsTest extends CleanupHelper
 {
     public static final String TABLE1 = "Keyspace1";
-    public static final String TABLE2 = "Keyspace2";
-    public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
-
-    public static final int MIN_COMPACTION_THRESHOLD = 2;
 
     @Test
     public void testCompactions() throws IOException, ExecutionException, InterruptedException
@@ -103,61 +94,6 @@ public class CompactionsTest extends Cle
     }
 
     @Test
-    public void testGetBuckets()
-    {
-        List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>();
-        String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" };
-        for (String st : strings)
-        {
-            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
-            pairs.add(pair);
-        }
-
-        Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2);
-        assertEquals(3, buckets.size());
-
-        for (List<String> bucket : buckets)
-        {
-            assertEquals(2, bucket.size());
-            assertEquals(bucket.get(0).length(), bucket.get(1).length());
-            assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
-        }
-
-        pairs.clear();
-        buckets.clear();
-
-        String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
-        for (String st : strings2)
-        {
-            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
-            pairs.add(pair);
-        }
-
-        buckets = CompactionManager.getBuckets(pairs, 2);
-        assertEquals(2, buckets.size());
-
-        for (List<String> bucket : buckets)
-        {
-            assertEquals(3, bucket.size());
-            assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
-            assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0));
-        }
-
-        // Test the "min" functionality
-        pairs.clear();
-        buckets.clear();
-
-        String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
-        for (String st : strings3)
-        {
-            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
-            pairs.add(pair);
-        }
-
-        buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10
-        assertEquals(1, buckets.size());
-    }
-    @Test
     public void testEchoedRow() throws IOException, ExecutionException, InterruptedException
     {
         // This test check that EchoedRow doesn't skipp rows: see CASSANDRA-2653

Added: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java?rev=1134460&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java Fri Jun 10 22:13:54 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.utils.Pair;
+
+public class SizeTieredCompactionStrategyTest {
+    @Test
+    public void testGetBuckets()
+    {
+        List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>();
+        String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" };
+        for (String st : strings)
+        {
+            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+            pairs.add(pair);
+        }
+
+        Map<String, String> emptyOptions = new HashMap<String, String>();
+        SizeTieredCompactionStrategy strategy = new SizeTieredCompactionStrategy(mock(ColumnFamilyStore.class), emptyOptions);
+        List<List<String>> buckets = strategy.getBuckets(pairs, 2);
+        assertEquals(3, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(2, bucket.size());
+            assertEquals(bucket.get(0).length(), bucket.get(1).length());
+            assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+        }
+
+        pairs.clear();
+        buckets.clear();
+
+        String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+        for (String st : strings2)
+        {
+            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+            pairs.add(pair);
+        }
+
+        buckets = strategy.getBuckets(pairs, 2);
+        assertEquals(2, buckets.size());
+
+        for (List<String> bucket : buckets)
+        {
+            assertEquals(3, bucket.size());
+            assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+            assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0));
+        }
+
+        // Test the "min" functionality
+        pairs.clear();
+        buckets.clear();
+
+        String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+        for (String st : strings3)
+        {
+            Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+            pairs.add(pair);
+        }
+
+        buckets = strategy.getBuckets(pairs, 10); // notice the min is 10
+        assertEquals(1, buckets.size());
+    }
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Jun 10 22:13:54 2011
@@ -28,7 +28,6 @@ import java.util.*;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.QueryFilter;