You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/02 10:51:28 UTC
[1/2] cassandra git commit: Make sure we release sstable references
after anticompaction
Repository: cassandra
Updated Branches:
refs/heads/trunk 25314c204 -> 06f626acd
Make sure we release sstable references after anticompaction
Patch by marcuse; reviewed by yukim for CASSANDRA-8386
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d15c9187
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d15c9187
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d15c9187
Branch: refs/heads/trunk
Commit: d15c9187a4b66645bf0575a7c3bfdbb9b10a263d
Parents: 9c0f575
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Nov 27 18:12:24 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Dec 2 10:10:33 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 66 +++++++++++---------
.../cassandra/io/sstable/SSTableReader.java | 2 +-
.../db/compaction/AntiCompactionTest.java | 55 ++++++++++++----
4 files changed, 82 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d454ba2..7df396d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Release sstable references after anticompaction (CASSANDRA-8386)
* Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
* Fix high size calculations for prepared statements (CASSANDRA-8231)
* Centralize shared executors (CASSANDRA-8055)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 61628ff..d85ffd7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -391,6 +391,8 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Make sure the {validatedForRepair} are marked for compaction before calling this.
*
+ * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)).
+ *
* @param cfs
* @param ranges Ranges that the repair was carried out on
* @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
@@ -407,40 +409,48 @@ public class CompactionManager implements CompactionManagerMBean
Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
Set<SSTableReader> nonAnticompacting = new HashSet<>();
Iterator<SSTableReader> sstableIterator = sstables.iterator();
- while (sstableIterator.hasNext())
+ try
{
- SSTableReader sstable = sstableIterator.next();
- for (Range<Token> r : Range.normalize(ranges))
+ while (sstableIterator.hasNext())
{
- Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
- if (r.contains(sstableRange))
+ SSTableReader sstable = sstableIterator.next();
+ for (Range<Token> r : Range.normalize(ranges))
{
- logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
- sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
- sstable.reloadSSTableMetadata();
- mutatedRepairStatuses.add(sstable);
- sstableIterator.remove();
- break;
- }
- else if (!sstableRange.intersects(r))
- {
- logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
- nonAnticompacting.add(sstable);
- sstableIterator.remove();
- break;
- }
- else
- {
- logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
+ if (r.contains(sstableRange))
+ {
+ logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+ sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+ sstable.reloadSSTableMetadata();
+ mutatedRepairStatuses.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else if (!sstableRange.intersects(r))
+ {
+ logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+ nonAnticompacting.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else
+ {
+ logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ }
}
}
+ cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+ cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ if (!sstables.isEmpty())
+ doAntiCompaction(cfs, ranges, sstables, repairedAt);
}
- cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
- cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
- if (!sstables.isEmpty())
- doAntiCompaction(cfs, ranges, sstables, repairedAt);
- SSTableReader.releaseReferences(sstables);
- cfs.getDataTracker().unmarkCompacting(sstables);
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ }
+
logger.info(String.format("Completed anticompaction successfully"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0a34b4a..0024f24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1645,7 +1645,7 @@ public class SSTableReader extends SSTable
}
@VisibleForTesting
- int referenceCount()
+ public int referenceCount()
{
return references.get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5ed4f4a..090839e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -75,22 +75,30 @@ public class AntiCompactionTest extends SchemaLoader
int nonRepairedKeys = 0;
for (SSTableReader sstable : store.getSSTables())
{
- SSTableScanner scanner = sstable.getScanner();
- while (scanner.hasNext())
+ try (SSTableScanner scanner = sstable.getScanner())
{
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (sstable.isRepaired())
+ while (scanner.hasNext())
{
- assertTrue(range.contains(row.getKey().getToken()));
- repairedKeys++;
- }
- else
- {
- assertFalse(range.contains(row.getKey().getToken()));
- nonRepairedKeys++;
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (sstable.isRepaired())
+ {
+ assertTrue(range.contains(row.getKey().getToken()));
+ repairedKeys++;
+ }
+ else
+ {
+ assertFalse(range.contains(row.getKey().getToken()));
+ nonRepairedKeys++;
+ }
}
}
}
+ for (SSTableReader sstable : store.getSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.referenceCount());
+ }
+ assertEquals(0, store.getDataTracker().getCompacting().size());
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
@@ -103,7 +111,6 @@ public class AntiCompactionTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
long origSize = s.bytesOnDisk();
- System.out.println(cfs.metric.liveDiskSpaceUsed.count());
Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
Collection<SSTableReader> sstables = cfs.getSSTables();
SSTableReader.acquireReferences(sstables);
@@ -146,16 +153,38 @@ public class AntiCompactionTest extends SchemaLoader
List<Range<Token>> ranges = Arrays.asList(range);
SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
assertThat(store.getSSTables().size(), is(1));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+ assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(store.getDataTracker().getCompacting().size(), is(0));
}
+ @Test
+ public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+ Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+
+ assertThat(store.getSSTables().size(), is(1));
+ assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+ assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(store.getDataTracker().getCompacting().size(), is(0));
+ }
+
+
private ColumnFamilyStore prepareColumnFamilyStore()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
store.disableAutoCompaction();
long timestamp = System.currentTimeMillis();
for (int i = 0; i < 10; i++)
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06f626ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06f626ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06f626ac
Branch: refs/heads/trunk
Commit: 06f626acd27b051222616c0c91f7dd8d556b8d45
Parents: 25314c2 d15c918
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Dec 2 10:49:59 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Dec 2 10:50:18 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 66 +++++++------
.../db/compaction/AntiCompactionTest.java | 99 ++++++++++++++------
3 files changed, 108 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22cc598,7df396d..141c3a8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,7 +1,43 @@@
+3.0
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+
+
2.1.3
+ * Release sstable references after anticompaction (CASSANDRA-8386)
* Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
- * Fix high size calculations for prepared statements (CASSANDRA-8231)
* Centralize shared executors (CASSANDRA-8055)
* Fix filtering for CONTAINS (KEY) relations on frozen collection
clustering columns when the query is restricted to a single
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a9a4773,d85ffd7..ed875b8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -407,40 -409,48 +409,48 @@@ public class CompactionManager implemen
Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
Set<SSTableReader> nonAnticompacting = new HashSet<>();
Iterator<SSTableReader> sstableIterator = sstables.iterator();
- while (sstableIterator.hasNext())
+ try
{
- SSTableReader sstable = sstableIterator.next();
- for (Range<Token> r : Range.normalize(ranges))
+ while (sstableIterator.hasNext())
{
- Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken());
- if (r.contains(sstableRange))
- {
- logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
- sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
- sstable.reloadSSTableMetadata();
- mutatedRepairStatuses.add(sstable);
- sstableIterator.remove();
- break;
- }
- else if (!sstableRange.intersects(r))
+ SSTableReader sstable = sstableIterator.next();
+ for (Range<Token> r : Range.normalize(ranges))
{
- logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
- nonAnticompacting.add(sstable);
- sstableIterator.remove();
- break;
- }
- else
- {
- logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
- Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
++ Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken());
+ if (r.contains(sstableRange))
+ {
+ logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+ sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+ sstable.reloadSSTableMetadata();
+ mutatedRepairStatuses.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else if (!sstableRange.intersects(r))
+ {
+ logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+ nonAnticompacting.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else
+ {
+ logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ }
}
}
+ cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+ cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ if (!sstables.isEmpty())
+ doAntiCompaction(cfs, ranges, sstables, repairedAt);
}
- cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
- cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
- if (!sstables.isEmpty())
- doAntiCompaction(cfs, ranges, sstables, repairedAt);
- SSTableReader.releaseReferences(sstables);
- cfs.getDataTracker().unmarkCompacting(sstables);
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ }
+
logger.info(String.format("Completed anticompaction successfully"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 87e4315,090839e..2396acb
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -56,49 -50,17 +56,34 @@@ import org.apache.cassandra.utils.ByteB
import com.google.common.collect.Iterables;
-public class AntiCompactionTest extends SchemaLoader
+public class AntiCompactionTest
{
- private static final String KEYSPACE1 = "Keyspace1";
+ private static final String KEYSPACE1 = "AntiCompactionTest";
private static final String CF = "Standard1";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ SimpleStrategy.class,
+ KSMetaData.optsWithRF(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF));
+ }
+
+ @After
+ public void truncateCF()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
+ }
+
@Test
-- public void antiCompactOne() throws InterruptedException, ExecutionException, IOException
++ public void antiCompactOne() throws Exception
{
- Keyspace keyspace = Keyspace.open(KEYSPACE1);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
- store.disableAutoCompaction();
- long timestamp = System.currentTimeMillis();
- for (int i = 0; i < 10; i++)
- {
- DecoratedKey key = Util.dk(Integer.toString(i));
- Mutation rm = new Mutation(KEYSPACE1, key.getKey());
- for (int j = 0; j < 10; j++)
- rm.add(CF, Util.cellname(Integer.toString(j)),
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- timestamp,
- 0);
- rm.applyUnsafe();
- }
- store.forceBlockingFlush();
+ ColumnFamilyStore store = prepareColumnFamilyStore();
Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
assertEquals(store.getSSTables().size(), sstables.size());
Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
@@@ -113,19 -75,21 +98,21 @@@
int nonRepairedKeys = 0;
for (SSTableReader sstable : store.getSSTables())
{
- ICompactionScanner scanner = sstable.getScanner();
- while (scanner.hasNext())
- try (SSTableScanner scanner = sstable.getScanner())
++ try (ICompactionScanner scanner = sstable.getScanner())
{
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (sstable.isRepaired())
- {
- assertTrue(range.contains(row.getKey().getToken()));
- repairedKeys++;
- }
- else
+ while (scanner.hasNext())
{
- assertFalse(range.contains(row.getKey().getToken()));
- nonRepairedKeys++;
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (sstable.isRepaired())
+ {
+ assertTrue(range.contains(row.getKey().getToken()));
+ repairedKeys++;
+ }
+ else
+ {
+ assertFalse(range.contains(row.getKey().getToken()));
+ nonRepairedKeys++;
+ }
}
}
}
@@@ -163,7 -131,12 +155,7 @@@
File dir = cfs.directories.getDirectoryForNewSSTables();
String filename = cfs.getTempSSTablePath(dir);
- SSTableWriter writer = SSTableWriter.create(filename,0,0);
- SSTableWriter writer = new SSTableWriter(filename,
- 0,
- 0,
- cfs.metadata,
- StorageService.getPartitioner(),
- new MetadataCollector(cfs.metadata.comparator));
++ SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
for (int i = 0; i < count * 5; i++)
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
@@@ -215,60 -153,60 +207,107 @@@
List<Range<Token>> ranges = Arrays.asList(range);
SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
-
- assertThat(store.getSSTables().size(), is(1));
- assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
- assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
- assertThat(store.getDataTracker().getCompacting().size(), is(0));
+ long repairedAt = 1000;
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+ /*
+ Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
+ so there will be no net change in the number of sstables
+ */
+ assertEquals(10, store.getSSTables().size());
+ int repairedKeys = 0;
+ int nonRepairedKeys = 0;
+ for (SSTableReader sstable : store.getSSTables())
+ {
+ ICompactionScanner scanner = sstable.getScanner();
+ while (scanner.hasNext())
+ {
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (sstable.isRepaired())
+ {
+ assertTrue(range.contains(row.getKey().getToken()));
+ assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
+ repairedKeys++;
+ }
+ else
+ {
+ assertFalse(range.contains(row.getKey().getToken()));
+ assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
+ nonRepairedKeys++;
+ }
+ }
+ }
+ assertEquals(repairedKeys, 40);
+ assertEquals(nonRepairedKeys, 60);
}
-
@Test
+ public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+ Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+
+ assertThat(store.getSSTables().size(), is(1));
+ assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+ assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(store.getDataTracker().getCompacting().size(), is(0));
+ }
+
+
++ @Test
+ public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.disableAutoCompaction();
+
+ for (int table = 0; table < 10; table++)
+ {
+ generateSStable(store,Integer.toString(table));
+ }
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+
+ Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
+
+ assertThat(store.getSSTables().size(), is(10));
+ assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+ }
+
+ private ColumnFamilyStore prepareColumnFamilyStore()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
- store.truncateBlocking();
+ store.disableAutoCompaction();
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+ for (int j = 0; j < 10; j++)
+ rm.add("Standard1", Util.cellname(Integer.toString(j)),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp,
+ 0);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ return store;
+ }
-
++
+ @After
- public void truncateCF()
++ public void truncateCfs()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
+ }
}