You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/08/20 13:28:26 UTC
git commit: Anticompact sstables as groups
Repository: cassandra
Updated Branches:
refs/heads/trunk 060c7961d -> 37f517593
Anticompact sstables as groups
Patch by Russell Spitzer; reviewed by marcuse for CASSANDRA-6851
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37f51759
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37f51759
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37f51759
Branch: refs/heads/trunk
Commit: 37f5175935a37ce2c005335c2f486efb827b6eba
Parents: 060c796
Author: Russell Spitzer <Ru...@gmail.com>
Authored: Wed Aug 20 13:27:52 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Aug 20 13:27:52 2014 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/AbstractCompactionStrategy.java | 31 ++++
.../db/compaction/CompactionManager.java | 174 +++++++++++++------
.../compaction/LeveledCompactionStrategy.java | 57 +++++-
.../db/compaction/AntiCompactionTest.java | 96 +++++++++-
.../LeveledCompactionStrategyTest.java | 61 ++++++-
6 files changed, 358 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d1dedbf..80ddddc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0
+ * Do anticompaction in groups (CASSANDRA-6851)
* Verify that UDF class methods are static (CASSANDRA-7781)
* Support pure user-defined functions (CASSANDRA-7395)
* Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1b7786e..28ab84e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -399,4 +399,35 @@ public abstract class AbstractCompactionStrategy
return optionValue == null || Boolean.parseBoolean(optionValue);
}
+
+
+ /**
+ * Method for grouping similar SSTables together, This will be used by
+ * anti-compaction to determine which SSTables should be anitcompacted
+ * as a group. If a given compaction strategy creates sstables which
+ * cannot be merged due to some constraint it must override this method.
+ */
+ public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
+ {
+ int groupSize = 2;
+ List<SSTableReader> sortedSSTablesToGroup = new ArrayList<>(sstablesToGroup);
+ Collections.sort(sortedSSTablesToGroup, SSTableReader.sstableComparator);
+
+ Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>();
+ Collection<SSTableReader> currGroup = new ArrayList<>();
+
+ for (SSTableReader sstable : sortedSSTablesToGroup)
+ {
+ currGroup.add(sstable);
+ if (currGroup.size() == groupSize)
+ {
+ groupedSSTables.add(currGroup);
+ currGroup = new ArrayList<>();
+ }
+ }
+
+ if (currGroup.size() != 0)
+ groupedSSTables.add(currGroup);
+ return groupedSSTables;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e578ddf..5af7139 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -52,6 +52,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -397,7 +398,7 @@ public class CompactionManager implements CompactionManagerMBean
Collection<SSTableReader> validatedForRepair,
long repairedAt) throws InterruptedException, ExecutionException, IOException
{
- logger.info("Starting anticompaction");
+ logger.info("Starting anticompaction for {}/{}", cfs.keyspace.getName(), cfs.getColumnFamilyName());
logger.debug("Starting anticompaction for ranges {}", ranges);
Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
@@ -847,6 +848,37 @@ public class CompactionManager implements CompactionManagerMBean
new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator, sstable.getSSTableLevel()));
}
+ public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
+ File compactionFileLocation,
+ int expectedBloomFilterSize,
+ long repairedAt,
+ Collection<SSTableReader> sstables)
+ {
+ FileUtils.createDirectory(compactionFileLocation);
+ int minLevel = Integer.MAX_VALUE;
+ // if all sstables have the same level, we can compact them together without creating overlap during anticompaction
+ // note that we only anticompact from unrepaired sstables, which is not leveled, but we still keep original level
+ // after first migration to be able to drop the sstables back in their original place in the repaired sstable manifest
+ for (SSTableReader sstable : sstables)
+ {
+ if (minLevel == Integer.MAX_VALUE)
+ minLevel = sstable.getSSTableLevel();
+
+ if (minLevel != sstable.getSSTableLevel())
+ {
+ minLevel = 0;
+ break;
+ }
+ }
+ return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
+ expectedBloomFilterSize,
+ repairedAt,
+ cfs.metadata,
+ cfs.partitioner,
+ new MetadataCollector(sstables, cfs.metadata.comparator, minLevel));
+ }
+
+
/**
* Performs a readonly "compaction" of all sstables in order to validate complete rows,
* but without writing the merge result
@@ -947,6 +979,8 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+
+
/**
* Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted
@@ -956,83 +990,111 @@ public class CompactionManager implements CompactionManagerMBean
* @param ranges Repaired ranges to be placed into one of the new sstables. The repaired table will be tracked via
* the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
*/
- private Collection<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, Collection<SSTableReader> repairedSSTables, long repairedAt)
+ private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
+ Collection<SSTableReader> repairedSSTables, long repairedAt)
{
- List<SSTableReader> anticompactedSSTables = new ArrayList<>();
- int repairedKeyCount = 0;
- int unrepairedKeyCount = 0;
// TODO(5351): we can do better here:
- int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(repairedSSTables)));
logger.info("Performing anticompaction on {} sstables", repairedSSTables.size());
+
+ //Group SSTables
+ Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repairedSSTables);
// iterate over sstables to check if the repaired / unrepaired ranges intersect them.
- for (SSTableReader sstable : repairedSSTables)
+ int antiCompactedSSTableCount = 0;
+ for (Collection<SSTableReader> sstableGroup : groupedSSTables)
{
- // check that compaction hasn't stolen any sstables used in previous repair sessions
- // if we need to skip the anticompaction, it will be carried out by the next repair
+ int antiCompacted = antiCompactGroup(cfs, ranges, sstableGroup, repairedAt);
+ antiCompactedSSTableCount += antiCompacted;
+ }
+
+ String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
+ logger.info(format, repairedSSTables.size(), antiCompactedSSTableCount);
+ }
+
+ private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
+ Collection<SSTableReader> anticompactionGroup, long repairedAt)
+ {
+ long groupMaxDataAge = -1;
+
+ // check that compaction hasn't stolen any sstables used in previous repair sessions
+ // if we need to skip the anticompaction, it will be carried out by the next repair
+ for (Iterator<SSTableReader> i = anticompactionGroup.iterator(); i.hasNext();)
+ {
+ SSTableReader sstable = i.next();
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
+
+ if (anticompactionGroup.size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
+ return 0;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
- File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
- SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+ File destination = cfs.directories.getDirectoryForNewSSTables();
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+ List<ICompactionScanner> scanners = strategy.getScanners(anticompactionGroup);
- try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)), CFMetaData.DEFAULT_GC_GRACE_SECONDS))
- {
- repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
- unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+ int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
- CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
+ try (CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
+ repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+ unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+ CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
+
+ try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+ {
+ while(iter.hasNext())
{
- while(iter.hasNext())
+ AbstractCompactedRow row = iter.next();
+ // if current range from sstable is repaired, save it into the new repaired sstable
+ if (Range.isInRanges(row.key.getToken(), ranges))
+ {
+ repairedSSTableWriter.append(row);
+ repairedKeyCount++;
+ }
+ // otherwise save into the new 'non-repaired' table
+ else
{
- AbstractCompactedRow row = iter.next();
- // if current range from sstable is repaired, save it into the new repaired sstable
- if (Range.isInRanges(row.key.getToken(), ranges))
- {
- repairedSSTableWriter.append(row);
- repairedKeyCount++;
- }
- // otherwise save into the new 'non-repaired' table
- else
- {
- unRepairedSSTableWriter.append(row);
- unrepairedKeyCount++;
- }
+ unRepairedSSTableWriter.append(row);
+ unrepairedKeyCount++;
}
}
- // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
- repairedSSTableWriter.finish(false, repairedAt);
- unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
- // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
- anticompactedSSTables.addAll(repairedSSTableWriter.finished());
- anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
- }
- catch (Throwable e)
- {
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
}
+ // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
+ repairedSSTableWriter.finish(false, repairedAt);
+ unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
+ // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+ repairedKeyCount + unrepairedKeyCount,
+ cfs.keyspace.getName(),
+ cfs.getColumnFamilyName(),
+ anticompactionGroup);
+ return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
}
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
-
- return anticompactedSSTables;
+ catch (Throwable e)
+ {
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..b179b3a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -18,7 +18,18 @@
package org.apache.cassandra.db.compaction;
import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@@ -170,6 +181,50 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
return new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableBytes);
}
+ /**
+ * Leveled compaction strategy has guarantees on the data contained within each level so we
+ * have to make sure we only create groups of SSTables with members from the same level.
+ * This way we won't end up creating invalid sstables during anti-compaction.
+ * @param ssTablesToGroup
+ * @return Groups of sstables from the same level
+ */
+ @Override
+ public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> ssTablesToGroup)
+ {
+ int groupSize = 2;
+ Map<Integer, Collection<SSTableReader>> sstablesByLevel = new HashMap<>();
+ for (SSTableReader sstable : ssTablesToGroup)
+ {
+ Integer level = sstable.getSSTableLevel();
+ if (!sstablesByLevel.containsKey(level))
+ {
+ sstablesByLevel.put(level, new ArrayList<SSTableReader>());
+ }
+ sstablesByLevel.get(level).add(sstable);
+ }
+
+ Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>();
+
+ for (Collection<SSTableReader> levelOfSSTables : sstablesByLevel.values())
+ {
+ Collection<SSTableReader> currGroup = new ArrayList<>();
+ for (SSTableReader sstable : levelOfSSTables)
+ {
+ currGroup.add(sstable);
+ if (currGroup.size() == groupSize)
+ {
+ groupedSSTables.add(currGroup);
+ currGroup = new ArrayList<>();
+ }
+ }
+
+ if (currGroup.size() != 0)
+ groupedSSTables.add(currGroup);
+ }
+ return groupedSSTables;
+
+ }
+
public int getEstimatedRemainingTasks()
{
return manifest.getEstimatedTasks();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index e47f0e9..f632a65 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SimpleStrategy;
import org.junit.BeforeClass;
+import org.junit.After;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -41,6 +42,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import static junit.framework.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
@@ -51,14 +53,23 @@ public class AntiCompactionTest
private static final String KEYSPACE1 = "AntiCompactionTest";
private static final String CF = "Standard1";
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
- SimpleStrategy.class,
- KSMetaData.optsWithRF(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF));
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF));
+ }
+
+ @After
+ public void truncateCF()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
}
@Test
@@ -113,4 +124,83 @@ public class AntiCompactionTest
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
+
+
+ public void generateSStable(ColumnFamilyStore store, String Suffix)
+ {
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix);
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add("Standard1", Util.cellname(Integer.toString(j)),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp,
+ 0);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ }
+
+ @Test
+ public void antiCompactTenSTC() throws InterruptedException, ExecutionException, IOException{
+ antiCompactTen("SizeTieredCompactionStrategy");
+ }
+
+ @Test
+ public void antiCompactTenLC() throws InterruptedException, ExecutionException, IOException{
+ antiCompactTen("LeveledCompactionStrategy");
+ }
+
+ public void antiCompactTen(String compactionStrategy) throws InterruptedException, ExecutionException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.setCompactionStrategyClass(compactionStrategy);
+ store.disableAutoCompaction();
+
+ for (int table = 0; table < 10; table++)
+ {
+ generateSStable(store,Integer.toString(table));
+ }
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+
+ Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
+ SSTableReader.acquireReferences(sstables);
+ long repairedAt = 1000;
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+ /*
+ Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
+ so there will be no net change in the number of sstables
+ */
+ assertEquals(10, store.getSSTables().size());
+ int repairedKeys = 0;
+ int nonRepairedKeys = 0;
+ for (SSTableReader sstable : store.getSSTables())
+ {
+ SSTableScanner scanner = sstable.getScanner();
+ while (scanner.hasNext())
+ {
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (sstable.isRepaired())
+ {
+ assertTrue(range.contains(row.getKey().getToken()));
+ assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
+ repairedKeys++;
+ }
+ else
+ {
+ assertFalse(range.contains(row.getKey().getToken()));
+ assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
+ nonRepairedKeys++;
+ }
+ }
+ }
+ assertEquals(repairedKeys, 40);
+ assertEquals(nonRepairedKeys, 60);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 5f9b72b..7eec449 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -18,7 +18,13 @@
package org.apache.cassandra.db.compaction;
import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.junit.After;
import org.junit.Before;
@@ -30,7 +36,10 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
@@ -84,6 +93,54 @@ public class LeveledCompactionStrategyTest
cfs.truncateBlocking();
}
+ /**
+ * Ensure that the grouping operation preserves the levels of grouped tables
+ */
+ @Test
+ public void testGrouperLevels() throws Exception{
+ ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
+
+ // Enough data to have a level 1 and 2
+ int rows = 20;
+ int columns = 10;
+
+ // Adds enough data to trigger multiple sstable per level
+ for (int r = 0; r < rows; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ for (int c = 0; c < columns; c++)
+ {
+ rm.add(CF_STANDARDDLEVELED, Util.cellname("column" + c), value, 0);
+ }
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+
+ waitForLeveling(cfs);
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+ // Checking we're not completely bad at math
+ assert strategy.getLevelSize(1) > 0;
+ assert strategy.getLevelSize(2) > 0;
+
+ Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(cfs.getSSTables());
+ for (Collection<SSTableReader> sstableGroup : groupedSSTables)
+ {
+ int groupLevel = -1;
+ Iterator<SSTableReader> it = sstableGroup.iterator();
+ while (it.hasNext())
+ {
+
+ SSTableReader sstable = it.next();
+ int tableLevel = sstable.getSSTableLevel();
+ if (groupLevel == -1)
+ groupLevel = tableLevel;
+ assert groupLevel == tableLevel;
+ }
+ }
+
+ }
+
/*
* This exercises in particular the code of #4142
*/