You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/11 00:13:55 UTC
svn commit: r1134460 [2/2] - in /cassandra/trunk: ./ interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/ src/avro/
src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/ src/java/org/apa...
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri Jun 10 22:13:54 2011
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -34,7 +33,6 @@ import javax.management.ObjectName;
import org.apache.commons.collections.PredicateUtils;
import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.commons.collections.iterators.FilterIterator;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +49,6 @@ import org.apache.cassandra.service.Anti
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.OperationType;
import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
/**
* A singleton which manages a private executor of ongoing compactions. A readwrite lock
@@ -85,7 +82,6 @@ public class CompactionManager implement
}
private CompactionExecutor executor = new CompactionExecutor();
- private Map<ColumnFamilyStore, Integer> estimatedCompactions = new NonBlockingHashMap<ColumnFamilyStore, Integer>();
/**
* @return A lock, for which acquisition means no compactions can run.
@@ -111,37 +107,20 @@ public class CompactionManager implement
{
if (cfs.isInvalid())
return 0;
- Integer minThreshold = cfs.getMinimumCompactionThreshold();
- Integer maxThreshold = cfs.getMaximumCompactionThreshold();
-
- if (minThreshold == 0 || maxThreshold == 0)
- {
- logger.debug("Compaction is currently disabled.");
- return 0;
- }
- logger.debug("Checking to see if compaction of " + cfs.columnFamily + " would be useful");
- Set<List<SSTableReader>> buckets = getBuckets(convertSSTablesToPairs(cfs.getSSTables()), 50L * 1024L * 1024L);
- updateEstimateFor(cfs, buckets);
- int gcBefore = getDefaultGcBefore(cfs);
-
- for (List<SSTableReader> sstables : buckets)
+
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ for (AbstractCompactionTask task : strategy.getBackgroundTasks(getDefaultGcBefore(cfs)))
{
- if (sstables.size() < minThreshold)
- continue;
- // if we have too many to compact all at once, compact older ones first -- this avoids
- // re-compacting files we just created.
- Collections.sort(sstables);
- Collection<SSTableReader> tocompact = cfs.getDataTracker().markCompacting(sstables, minThreshold, maxThreshold);
- if (tocompact == null)
- // enough threads are busy in this bucket
+ if (!cfs.getDataTracker().markCompacting(task))
continue;
+
try
{
- return doCompaction(cfs, tocompact, gcBefore);
+ task.execute(executor);
}
finally
{
- cfs.getDataTracker().unmarkCompacting(tocompact);
+ cfs.getDataTracker().unmarkCompacting(task);
}
}
}
@@ -155,29 +134,6 @@ public class CompactionManager implement
return executor.submit(callable);
}
- private void updateEstimateFor(ColumnFamilyStore cfs, Set<List<SSTableReader>> buckets)
- {
- Integer minThreshold = cfs.getMinimumCompactionThreshold();
- Integer maxThreshold = cfs.getMaximumCompactionThreshold();
-
- if (minThreshold > 0 && maxThreshold > 0)
- {
- int n = 0;
- for (List<SSTableReader> sstables : buckets)
- {
- if (sstables.size() >= minThreshold)
- {
- n += Math.ceil((double)sstables.size() / maxThreshold);
- }
- }
- estimatedCompactions.put(cfs, n);
- }
- else
- {
- logger.debug("Compaction is currently disabled.");
- }
- }
-
public void performCleanup(final ColumnFamilyStore cfStore, final NodeId.OneShotRenewer renewer) throws InterruptedException, ExecutionException
{
Callable<Object> runnable = new Callable<Object>()
@@ -273,10 +229,10 @@ public class CompactionManager implement
public void performMajor(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException
{
- submitMajor(cfStore, 0, getDefaultGcBefore(cfStore)).get();
+ submitMajor(cfStore, getDefaultGcBefore(cfStore)).get();
}
- public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final long skip, final int gcBefore)
+ public Future<Object> submitMajor(final ColumnFamilyStore cfStore, final int gcBefore)
{
Callable<Object> callable = new Callable<Object>()
{
@@ -288,45 +244,30 @@ public class CompactionManager implement
{
if (cfStore.isInvalid())
return this;
- Collection<SSTableReader> sstables;
- if (skip > 0)
+ AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
+ for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
- sstables = new ArrayList<SSTableReader>();
- for (SSTableReader sstable : cfStore.getSSTables())
+ if (!cfStore.getDataTracker().markCompacting(task, 0, Integer.MAX_VALUE))
+ return this;
+ try
{
- if (sstable.length() < skip * 1024L * 1024L * 1024L)
+ // downgrade the lock acquisition
+ compactionLock.readLock().lock();
+ compactionLock.writeLock().unlock();
+ try
{
- sstables.add(sstable);
+ return task.execute(executor);
+ }
+ finally
+ {
+ compactionLock.readLock().unlock();
}
- }
- }
- else
- {
- sstables = cfStore.getSSTables();
- }
-
- Collection<SSTableReader> tocompact = cfStore.getDataTracker().markCompacting(sstables, 0, Integer.MAX_VALUE);
- if (tocompact == null || tocompact.isEmpty())
- return this;
- try
- {
- // downgrade the lock acquisition
- compactionLock.readLock().lock();
- compactionLock.writeLock().unlock();
- try
- {
- doCompaction(cfStore, tocompact, gcBefore);
}
finally
{
- compactionLock.readLock().unlock();
+ cfStore.getDataTracker().unmarkCompacting(task);
}
}
- finally
- {
- cfStore.getDataTracker().unmarkCompacting(tocompact);
- }
- return this;
}
finally
{
@@ -334,6 +275,7 @@ public class CompactionManager implement
if (compactionLock.writeLock().isHeldByCurrentThread())
compactionLock.writeLock().unlock();
}
+ return this;
}
};
return executor.submit(callable);
@@ -408,11 +350,12 @@ public class CompactionManager implement
// attempt to schedule the set
else if ((sstables = cfs.getDataTracker().markCompacting(sstables, 1, Integer.MAX_VALUE)) != null)
{
- String location = cfs.table.getDataFileLocation(1);
// success: perform the compaction
try
{
- doCompactionWithoutSizeEstimation(cfs, sstables, gcBefore, location);
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ AbstractCompactionTask task = strategy.getUserDefinedTask(sstables, gcBefore);
+ task.execute(executor);
}
finally
{
@@ -484,141 +427,6 @@ public class CompactionManager implement
}
}
- int doCompaction(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) throws IOException
- {
- if (sstables.size() < 2)
- {
- logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "; use forceUserDefinedCompaction if you wish to force compaction of single sstables (e.g. for tombstone collection)");
- return 0;
- }
-
- Table table = cfs.table;
-
- // If the compaction file path is null that means we have no space left for this compaction.
- // try again w/o the largest one.
- Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables);
- while (smallerSSTables.size() > 1)
- {
- String compactionFileLocation = table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
- if (compactionFileLocation != null)
- return doCompactionWithoutSizeEstimation(cfs, smallerSSTables, gcBefore, compactionFileLocation);
-
- logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
- smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
- }
-
- logger.error("insufficient space to compact even the two smallest files, aborting");
- return 0;
- }
-
- /**
- * For internal use and testing only. The rest of the system should go through the submit* methods,
- * which are properly serialized.
- */
- int doCompactionWithoutSizeEstimation(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, String compactionFileLocation) throws IOException
- {
- // The collection of sstables passed may be empty (but not null); even if
- // it is not empty, it may compact down to nothing if all rows are deleted.
- assert sstables != null;
-
- Table table = cfs.table;
- if (DatabaseDescriptor.isSnapshotBeforeCompaction())
- table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
-
- // sanity check: all sstables must belong to the same cfs
- for (SSTableReader sstable : sstables)
- assert sstable.descriptor.cfname.equals(cfs.columnFamily);
-
- // compaction won't normally compact a single sstable, so if that's what we're doing
- // it must have been requested manually by the user, which probably means he wants to force
- // tombstone purge, which won't happen unless we force deserializing the rows.
- boolean forceDeserialize = sstables.size() == 1;
- CompactionController controller = new CompactionController(cfs, sstables, gcBefore, forceDeserialize);
- // new sstables from flush can be added during a compaction, but only the compaction can remove them,
- // so in our single-threaded compaction world this is a valid way of determining if we're compacting
- // all the sstables (that existed when we started)
- CompactionType type = controller.isMajor()
- ? CompactionType.MAJOR
- : CompactionType.MINOR;
- logger.info("Compacting {}: {}", type, sstables);
-
- long startTime = System.currentTimeMillis();
- long totalkeysWritten = 0;
-
- // TODO the int cast here is potentially buggy
- int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-
- SSTableWriter writer;
- CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
- Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
- Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
-
- executor.beginCompaction(ci);
- try
- {
- if (!nni.hasNext())
- {
- // don't mark compacted in the finally block, since if there _is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is no period during which
- // a crash could cause data loss.
- cfs.markCompacted(sstables);
- return 0;
- }
-
- writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
- while (nni.hasNext())
- {
- AbstractCompactedRow row = nni.next();
- long position = writer.append(row);
- totalkeysWritten++;
-
- if (DatabaseDescriptor.getPreheatKeyCache())
- {
- for (SSTableReader sstable : sstables)
- {
- if (sstable.getCachedPosition(row.key) != null)
- {
- cachedKeys.put(row.key, position);
- break;
- }
- }
- }
- }
- }
- finally
- {
- ci.close();
- executor.finishCompaction(ci);
- }
-
- SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
- cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
- for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
- ssTable.cacheKey(entry.getKey(), entry.getValue());
- submitMinorIfNeeded(cfs);
-
- long dTime = System.currentTimeMillis() - startTime;
- long startsize = SSTable.getTotalBytes(sstables);
- long endsize = ssTable.length();
- double ratio = (double)endsize / (double)startsize;
- logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.",
- writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
- return sstables.size();
- }
-
- private static long getMaxDataAge(Collection<SSTableReader> sstables)
- {
- long max = 0;
- for (SSTableReader sstable : sstables)
- {
- if (sstable.maxDataAge > max)
- max = sstable.maxDataAge;
- }
- return max;
- }
-
/**
* Deserialize everything in the CFS and re-serialize w/ the newest version. Also attempts to recover
* from bogus row keys / sizes using data from the index, and skips rows with garbage columns that resulted
@@ -975,70 +783,6 @@ public class CompactionManager implement
}
}
- /*
- * Group files of similar size into buckets.
- */
- static <T> Set<List<T>> getBuckets(Collection<Pair<T, Long>> files, long min)
- {
- // Sort the list in order to get deterministic results during the grouping below
- List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
- Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>()
- {
- public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
- {
- return p1.right.compareTo(p2.right);
- }
- });
-
- Map<List<T>, Long> buckets = new HashMap<List<T>, Long>();
-
- for (Pair<T, Long> pair: sortedFiles)
- {
- long size = pair.right;
-
- boolean bFound = false;
- // look for a bucket containing similar-sized files:
- // group in the same bucket if it's w/in 50% of the average for this bucket,
- // or this file and the bucket are all considered "small" (less than `min`)
- for (Entry<List<T>, Long> entry : buckets.entrySet())
- {
- List<T> bucket = entry.getKey();
- long averageSize = entry.getValue();
- if ((size > (averageSize / 2) && size < (3 * averageSize) / 2)
- || (size < min && averageSize < min))
- {
- // remove and re-add because adding changes the hash
- buckets.remove(bucket);
- long totalSize = bucket.size() * averageSize;
- averageSize = (totalSize + size) / (bucket.size() + 1);
- bucket.add(pair.left);
- buckets.put(bucket, averageSize);
- bFound = true;
- break;
- }
- }
- // no similar bucket found; put it in a new one
- if (!bFound)
- {
- ArrayList<T> bucket = new ArrayList<T>();
- bucket.add(pair.left);
- buckets.put(bucket, size);
- }
- }
-
- return buckets.keySet();
- }
-
- private static Collection<Pair<SSTableReader, Long>> convertSSTablesToPairs(Collection<SSTableReader> collection)
- {
- Collection<Pair<SSTableReader, Long>> tablePairs = new ArrayList<Pair<SSTableReader, Long>>();
- for(SSTableReader table: collection)
- {
- tablePairs.add(new Pair<SSTableReader, Long>(table, table.length()));
- }
- return tablePairs;
- }
-
/**
* Is not scheduled, because it is performing disjoint work from sstable compaction.
*/
@@ -1169,7 +913,7 @@ public class CompactionManager implement
return executor.submit(runnable);
}
- private static int getDefaultGcBefore(ColumnFamilyStore cfs)
+ static int getDefaultGcBefore(ColumnFamilyStore cfs)
{
return cfs.isIndex()
? Integer.MAX_VALUE
@@ -1201,7 +945,7 @@ public class CompactionManager implement
return executor.getActiveCount();
}
- private static class CompactionExecutor extends DebuggableThreadPoolExecutor
+ private static class CompactionExecutor extends DebuggableThreadPoolExecutor implements CompactionExecutorStatsCollector
{
// a synchronized identity set of running tasks to their compaction info
private final Set<CompactionInfo.Holder> compactions;
@@ -1222,12 +966,12 @@ public class CompactionManager implement
return Math.max(1, DatabaseDescriptor.getConcurrentCompactors());
}
- void beginCompaction(CompactionInfo.Holder ci)
+ public void beginCompaction(CompactionInfo.Holder ci)
{
compactions.add(ci);
}
- void finishCompaction(CompactionInfo.Holder ci)
+ public void finishCompaction(CompactionInfo.Holder ci)
{
compactions.remove(ci);
}
@@ -1238,6 +982,12 @@ public class CompactionManager implement
}
}
+ public interface CompactionExecutorStatsCollector
+ {
+ void beginCompaction(CompactionInfo.Holder ci);
+ void finishCompaction(CompactionInfo.Holder ci);
+ }
+
public List<CompactionInfo> getCompactions()
{
List<CompactionInfo> out = new ArrayList<CompactionInfo>();
@@ -1257,8 +1007,13 @@ public class CompactionManager implement
public int getPendingTasks()
{
int n = 0;
- for (Integer i : estimatedCompactions.values())
- n += i;
+ for (String tableName : DatabaseDescriptor.getTables())
+ {
+ for (ColumnFamilyStore cfs : Table.open(tableName).getColumnFamilyStores())
+ {
+ n += cfs.getCompactionStrategy().getEstimatedRemainingTasks();
+ }
+ }
return (int) (executor.getTaskCount() - executor.getCompletedTaskCount()) + n;
}
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1134460&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Fri Jun 10 22:13:54 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
+
+public class CompactionTask extends AbstractCompactionTask
+{
+ private static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
+ protected String compactionFileLocation;
+ protected final int gcBefore;
+ protected boolean isUserDefined;
+
+ public CompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore)
+ {
+ super(cfs, sstables);
+ compactionFileLocation = null;
+ this.gcBefore = gcBefore;
+ this.isUserDefined = false;
+ }
+
+ /**
+ * For internal use and testing only. The rest of the system should go through the submit* methods,
+ * which are properly serialized.
+ */
+ public int execute(CompactionExecutorStatsCollector collector) throws IOException
+ {
+ if (!isUserDefined)
+ {
+ if (sstables.size() < 2)
+ {
+ logger.info("Nothing to compact in " + cfs.getColumnFamilyName() + "." +
+ "Use forceUserDefinedCompaction if you wish to force compaction of single sstables " +
+ "(e.g. for tombstone collection)");
+ return 0;
+ }
+
+ if (compactionFileLocation == null)
+ compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(sstables));
+
+ // If the compaction file path is null that means we have no space left for this compaction.
+ // Try again w/o the largest one.
+ if (compactionFileLocation == null)
+ {
+ Set<SSTableReader> smallerSSTables = new HashSet<SSTableReader>(sstables);
+ while (compactionFileLocation == null && smallerSSTables.size() > 1)
+ {
+ logger.warn("insufficient space to compact all requested files " + StringUtils.join(smallerSSTables, ", "));
+ smallerSSTables.remove(cfs.getMaxSizeFile(smallerSSTables));
+ compactionFileLocation = cfs.table.getDataFileLocation(cfs.getExpectedCompactedFileSize(smallerSSTables));
+ }
+ }
+
+ if (compactionFileLocation == null)
+ {
+ logger.warn("insufficient space to compact even the two smallest files, aborting");
+ return 0;
+ }
+ }
+
+ // The collection of sstables passed may be empty (but not null); even if
+ // it is not empty, it may compact down to nothing if all rows are deleted.
+ assert sstables != null;
+
+ if (DatabaseDescriptor.isSnapshotBeforeCompaction())
+ cfs.table.snapshot(System.currentTimeMillis() + "-" + "compact-" + cfs.columnFamily);
+
+ // sanity check: all sstables must belong to the same cfs
+ for (SSTableReader sstable : sstables)
+ assert sstable.descriptor.cfname.equals(cfs.columnFamily);
+
+ CompactionController controller = new CompactionController(cfs, sstables, gcBefore, isUserDefined);
+ // new sstables from flush can be added during a compaction, but only the compaction can remove them,
+ // so in our single-threaded compaction world this is a valid way of determining if we're compacting
+ // all the sstables (that existed when we started)
+ CompactionType type = controller.isMajor()
+ ? CompactionType.MAJOR
+ : CompactionType.MINOR;
+ logger.info("Compacting {}: {}", type, sstables);
+
+ long startTime = System.currentTimeMillis();
+ long totalkeysWritten = 0;
+
+ // TODO the int cast here is potentially buggy
+ int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)SSTableReader.getApproximateKeyCount(sstables));
+ if (logger.isDebugEnabled())
+ logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+ SSTableWriter writer;
+ CompactionIterator ci = new CompactionIterator(type, sstables, controller); // retain a handle so we can call close()
+ Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
+ Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
+
+ if (collector != null)
+ collector.beginCompaction(ci);
+ try
+ {
+ if (!nni.hasNext())
+ {
+ // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so there is no period during which
+ // a crash could cause data loss.
+ cfs.markCompacted(sstables);
+ return 0;
+ }
+
+ writer = cfs.createCompactionWriter(expectedBloomFilterSize, compactionFileLocation, sstables);
+ while (nni.hasNext())
+ {
+ AbstractCompactedRow row = nni.next();
+ long position = writer.append(row);
+ totalkeysWritten++;
+
+ if (DatabaseDescriptor.getPreheatKeyCache())
+ {
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.getCachedPosition(row.key) != null)
+ {
+ cachedKeys.put(row.key, position);
+ break;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ ci.close();
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
+
+ SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(sstables));
+ cfs.replaceCompactedSSTables(sstables, Arrays.asList(ssTable));
+ for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
+ ssTable.cacheKey(entry.getKey(), entry.getValue());
+ CompactionManager.instance.submitMinorIfNeeded(cfs);
+
+ long dTime = System.currentTimeMillis() - startTime;
+ long startsize = SSTable.getTotalBytes(sstables);
+ long endsize = ssTable.length();
+ double ratio = (double)endsize / (double)startsize;
+ logger.info(String.format("Compacted to %s. %,d to %,d (~%d%% of original) bytes for %,d keys. Time: %,dms.",
+ writer.getFilename(), startsize, endsize, (int) (ratio * 100), totalkeysWritten, dTime));
+ return sstables.size();
+ }
+
+ public static long getMaxDataAge(Collection<SSTableReader> sstables)
+ {
+ long max = 0;
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.maxDataAge > max)
+ max = sstable.maxDataAge;
+ }
+ return max;
+ }
+
+ public CompactionTask compactionFileLocation(String compactionFileLocation)
+ {
+ this.compactionFileLocation = compactionFileLocation;
+ return this;
+ }
+
+ public CompactionTask isUserDefined(boolean isUserDefined)
+ {
+ this.isUserDefined = isUserDefined;
+ return this;
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1134460&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java Fri Jun 10 22:13:54 2011
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.Pair;
+
+public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
+{
+ private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
+ protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
+ protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
+ protected static long minSSTableSize;
+ protected volatile int estimatedRemainingTasks;
+
+ public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
+ {
+ super(cfs, options);
+ this.estimatedRemainingTasks = 0;
+ String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
+ minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE;
+ }
+
+ public List<AbstractCompactionTask> getBackgroundTasks(final int gcBefore)
+ {
+ if (cfs.isCompactionDisabled())
+ {
+ logger.debug("Compaction is currently disabled.");
+ return Collections.<AbstractCompactionTask>emptyList();
+ }
+
+ List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>();
+ List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()), minSSTableSize);
+
+ for (List<SSTableReader> bucket : buckets)
+ {
+ if (bucket.size() < cfs.getMinimumCompactionThreshold())
+ continue;
+
+ Collections.sort(bucket);
+ tasks.add(new CompactionTask(cfs, bucket.subList(0, Math.min(bucket.size(), cfs.getMaximumCompactionThreshold())), gcBefore));
+ }
+
+ updateEstimatedCompactionsByTasks(tasks);
+ return tasks;
+ }
+
+ public List<AbstractCompactionTask> getMaximalTasks(final int gcBefore)
+ {
+ List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>();
+ if (!cfs.getSSTables().isEmpty())
+ tasks.add(new CompactionTask(cfs, cfs.getSSTables(), gcBefore));
+ return tasks;
+ }
+
+ public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
+ {
+ return new CompactionTask(cfs, sstables, gcBefore)
+ .isUserDefined(true)
+ .compactionFileLocation(cfs.table.getDataFileLocation(1));
+ }
+
+ public int getEstimatedRemainingTasks()
+ {
+ return estimatedRemainingTasks;
+ }
+
+ private static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Collection<SSTableReader> collection)
+ {
+ List<Pair<SSTableReader, Long>> tableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>();
+ for(SSTableReader table: collection)
+ tableLengthPairs.add(new Pair<SSTableReader, Long>(table, table.length()));
+ return tableLengthPairs;
+ }
+
+ /*
+ * Group files of similar size into buckets.
+ */
+ <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, long minSSTableSize)
+ {
+ // Sort the list in order to get deterministic results during the grouping below
+ List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
+ Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>()
+ {
+ public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
+ {
+ return p1.right.compareTo(p2.right);
+ }
+ });
+
+ Map<List<T>, Long> buckets = new HashMap<List<T>, Long>();
+
+ for (Pair<T, Long> pair: sortedFiles)
+ {
+ long size = pair.right;
+
+ boolean bFound = false;
+ // look for a bucket containing similar-sized files:
+ // group in the same bucket if it's w/in 50% of the average for this bucket,
+ // or this file and the bucket are all considered "small" (less than `minSSTableSize`)
+ for (Entry<List<T>, Long> entry : buckets.entrySet())
+ {
+ List<T> bucket = entry.getKey();
+ long averageSize = entry.getValue();
+ if ((size > (averageSize / 2) && size < (3 * averageSize) / 2)
+ || (size < minSSTableSize && averageSize < minSSTableSize))
+ {
+ // remove and re-add because adding changes the hash
+ buckets.remove(bucket);
+ long totalSize = bucket.size() * averageSize;
+ averageSize = (totalSize + size) / (bucket.size() + 1);
+ bucket.add(pair.left);
+ buckets.put(bucket, averageSize);
+ bFound = true;
+ break;
+ }
+ }
+ // no similar bucket found; put it in a new one
+ if (!bFound)
+ {
+ ArrayList<T> bucket = new ArrayList<T>();
+ bucket.add(pair.left);
+ buckets.put(bucket, size);
+ }
+ }
+
+ return new LinkedList<List<T>>(buckets.keySet());
+ }
+
+ private void updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks)
+ {
+ int n = 0;
+ for (AbstractCompactionTask task: tasks)
+ {
+ if (!(task instanceof CompactionTask))
+ continue;
+
+ Collection<SSTableReader> sstablesToBeCompacted = task.getSSTables();
+ if (sstablesToBeCompacted.size() >= cfs.getMinimumCompactionThreshold())
+ n += Math.ceil((double)sstablesToBeCompacted.size() / cfs.getMaximumCompactionThreshold());
+ }
+ estimatedRemainingTasks = n;
+ }
+
+ public long getMinSSTableSize()
+ {
+ return minSSTableSize;
+ }
+
+ public String toString()
+ {
+ return String.format("SizeTieredCompactionStrategy[%s/%s]",
+ cfs.getMinimumCompactionThreshold(),
+ cfs.getMaximumCompactionThreshold());
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java Fri Jun 10 22:13:54 2011
@@ -39,7 +39,6 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
Modified: cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java (original)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/compaction/LongCompactionSpeedTest.java Fri Jun 10 22:13:54 2011
@@ -131,7 +131,8 @@ public class LongCompactionSpeedTest ext
Thread.sleep(1000);
long start = System.currentTimeMillis();
- CompactionManager.instance.doCompaction(store, sstables, (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getCFMetaData(TABLE1, "Standard1").getGcGraceSeconds());
+ final int gcBefore = (int) (System.currentTimeMillis() / 1000) - DatabaseDescriptor.getCFMetaData(TABLE1, "Standard1").getGcGraceSeconds();
+ new CompactionTask(store, sstables, gcBefore).execute(null);
System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
this.getClass().getName(),
sstableCount,
Modified: cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/cli/CliTest.java Fri Jun 10 22:13:54 2011
@@ -144,6 +144,8 @@ public class CliTest extends CleanupHelp
"create column family Countries with comparator=UTF8Type and column_metadata=[ {column_name: name, validation_class: UTF8Type} ];",
"set Countries[1][name] = USA;",
"get Countries[1][name];",
+ "update column family Countries with compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy';",
+ "create column family Cities with compaction_strategy = 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' and compaction_strategy_options = [{min_sstable_size:1024}];",
"set myCF['key']['scName']['firstname'] = 'John';",
"get myCF['key']['scName']",
"assume CF3 keys as utf8;",
@@ -264,4 +266,4 @@ public class CliTest extends CleanupHelp
assertEquals(unescaped, CliUtils.unescapeSQLString("'" + escaped + "'"));
assertEquals(escaped, CliUtils.escapeSQLString(unescaped));
}
-}
\ No newline at end of file
+}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Fri Jun 10 22:13:54 2011
@@ -77,7 +77,7 @@ public class DefsTest extends CleanupHel
assert cd2.min_compaction_threshold == null;
assert cd.row_cache_save_period_in_seconds == null;
assert cd2.row_cache_save_period_in_seconds == null;
-
+ assert cd.compaction_strategy == null;
}
@Test
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java Fri Jun 10 22:13:54 2011
@@ -19,9 +19,7 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
@@ -35,7 +33,6 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.Util;
@@ -86,7 +83,7 @@ public class CompactionsPurgeTest extend
cfs.forceBlockingFlush();
// major compact and test that all columns but the resurrected one is completely gone
- CompactionManager.instance.submitMajor(cfs, 0, Integer.MAX_VALUE).get();
+ CompactionManager.instance.submitMajor(cfs, Integer.MAX_VALUE).get();
cfs.invalidateCachedRow(key);
ColumnFamily cf = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, new QueryPath(cfName)));
assertColumns(cf, "5");
@@ -136,7 +133,7 @@ public class CompactionsPurgeTest extend
rm.add(new QueryPath(cfName, null, ByteBufferUtil.bytes(String.valueOf(5))), ByteBufferUtil.EMPTY_BYTE_BUFFER, 2);
rm.apply();
cfs.forceBlockingFlush();
- CompactionManager.instance.doCompaction(cfs, sstablesIncomplete, Integer.MAX_VALUE);
+ new CompactionTask(cfs, sstablesIncomplete, Integer.MAX_VALUE).execute(null);
// verify that minor compaction does not GC when key is present
// in a non-compacted sstable
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Fri Jun 10 22:13:54 2011
@@ -19,38 +19,29 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.List;
import java.util.ArrayList;
-import java.util.Set;
import java.util.HashSet;
-
-import org.apache.cassandra.Util;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.junit.Test;
+import static junit.framework.Assert.assertEquals;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.db.Table;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
-import static junit.framework.Assert.assertEquals;
public class CompactionsTest extends CleanupHelper
{
public static final String TABLE1 = "Keyspace1";
- public static final String TABLE2 = "Keyspace2";
- public static final InetAddress LOCAL = FBUtilities.getLocalAddress();
-
- public static final int MIN_COMPACTION_THRESHOLD = 2;
@Test
public void testCompactions() throws IOException, ExecutionException, InterruptedException
@@ -103,61 +94,6 @@ public class CompactionsTest extends Cle
}
@Test
- public void testGetBuckets()
- {
- List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>();
- String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" };
- for (String st : strings)
- {
- Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
- pairs.add(pair);
- }
-
- Set<List<String>> buckets = CompactionManager.getBuckets(pairs, 2);
- assertEquals(3, buckets.size());
-
- for (List<String> bucket : buckets)
- {
- assertEquals(2, bucket.size());
- assertEquals(bucket.get(0).length(), bucket.get(1).length());
- assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
- }
-
- pairs.clear();
- buckets.clear();
-
- String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
- for (String st : strings2)
- {
- Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
- pairs.add(pair);
- }
-
- buckets = CompactionManager.getBuckets(pairs, 2);
- assertEquals(2, buckets.size());
-
- for (List<String> bucket : buckets)
- {
- assertEquals(3, bucket.size());
- assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
- assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0));
- }
-
- // Test the "min" functionality
- pairs.clear();
- buckets.clear();
-
- String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
- for (String st : strings3)
- {
- Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
- pairs.add(pair);
- }
-
- buckets = CompactionManager.getBuckets(pairs, 10); // notice the min is 10
- assertEquals(1, buckets.size());
- }
- @Test
public void testEchoedRow() throws IOException, ExecutionException, InterruptedException
{
// This test check that EchoedRow doesn't skipp rows: see CASSANDRA-2653
Added: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java?rev=1134460&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java Fri Jun 10 22:13:54 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.utils.Pair;
+
+public class SizeTieredCompactionStrategyTest {
+ @Test
+ public void testGetBuckets()
+ {
+ List<Pair<String, Long>> pairs = new ArrayList<Pair<String, Long>>();
+ String[] strings = { "a", "bbbb", "cccccccc", "cccccccc", "bbbb", "a" };
+ for (String st : strings)
+ {
+ Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+ pairs.add(pair);
+ }
+
+ Map<String, String> emptyOptions = new HashMap<String, String>();
+ SizeTieredCompactionStrategy strategy = new SizeTieredCompactionStrategy(mock(ColumnFamilyStore.class), emptyOptions);
+ List<List<String>> buckets = strategy.getBuckets(pairs, 2);
+ assertEquals(3, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(2, bucket.size());
+ assertEquals(bucket.get(0).length(), bucket.get(1).length());
+ assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+ }
+
+ pairs.clear();
+ buckets.clear();
+
+ String[] strings2 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+ for (String st : strings2)
+ {
+ Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+ pairs.add(pair);
+ }
+
+ buckets = strategy.getBuckets(pairs, 2);
+ assertEquals(2, buckets.size());
+
+ for (List<String> bucket : buckets)
+ {
+ assertEquals(3, bucket.size());
+ assertEquals(bucket.get(0).charAt(0), bucket.get(1).charAt(0));
+ assertEquals(bucket.get(1).charAt(0), bucket.get(2).charAt(0));
+ }
+
+ // Test the "min" functionality
+ pairs.clear();
+ buckets.clear();
+
+ String[] strings3 = { "aaa", "bbbbbbbb", "aaa", "bbbbbbbb", "bbbbbbbb", "aaa" };
+ for (String st : strings3)
+ {
+ Pair<String, Long> pair = new Pair<String, Long>(st, new Long(st.length()));
+ pairs.add(pair);
+ }
+
+ buckets = strategy.getBuckets(pairs, 10); // notice the min is 10
+ assertEquals(1, buckets.size());
+ }
+}
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1134460&r1=1134459&r2=1134460&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Fri Jun 10 22:13:54 2011
@@ -28,7 +28,6 @@ import java.util.*;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;