You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/02 10:51:14 UTC
cassandra git commit: Make sure we release sstable references after
anticompaction
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 9c0f5753f -> d15c9187a
Make sure we release sstable references after anticompaction
Patch by marcuse; reviewed by yukim for CASSANDRA-8386
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d15c9187
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d15c9187
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d15c9187
Branch: refs/heads/cassandra-2.1
Commit: d15c9187a4b66645bf0575a7c3bfdbb9b10a263d
Parents: 9c0f575
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Nov 27 18:12:24 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Dec 2 10:10:33 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 66 +++++++++++---------
.../cassandra/io/sstable/SSTableReader.java | 2 +-
.../db/compaction/AntiCompactionTest.java | 55 ++++++++++++----
4 files changed, 82 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d454ba2..7df396d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Release sstable references after anticompaction (CASSANDRA-8386)
* Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
* Fix high size calculations for prepared statements (CASSANDRA-8231)
* Centralize shared executors (CASSANDRA-8055)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 61628ff..d85ffd7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -391,6 +391,8 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Make sure the {validatedForRepair} are marked for compaction before calling this.
*
+ * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)).
+ *
* @param cfs
* @param ranges Ranges that the repair was carried out on
* @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
@@ -407,40 +409,48 @@ public class CompactionManager implements CompactionManagerMBean
Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
Set<SSTableReader> nonAnticompacting = new HashSet<>();
Iterator<SSTableReader> sstableIterator = sstables.iterator();
- while (sstableIterator.hasNext())
+ try
{
- SSTableReader sstable = sstableIterator.next();
- for (Range<Token> r : Range.normalize(ranges))
+ while (sstableIterator.hasNext())
{
- Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
- if (r.contains(sstableRange))
+ SSTableReader sstable = sstableIterator.next();
+ for (Range<Token> r : Range.normalize(ranges))
{
- logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
- sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
- sstable.reloadSSTableMetadata();
- mutatedRepairStatuses.add(sstable);
- sstableIterator.remove();
- break;
- }
- else if (!sstableRange.intersects(r))
- {
- logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
- nonAnticompacting.add(sstable);
- sstableIterator.remove();
- break;
- }
- else
- {
- logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
+ if (r.contains(sstableRange))
+ {
+ logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+ sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+ sstable.reloadSSTableMetadata();
+ mutatedRepairStatuses.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else if (!sstableRange.intersects(r))
+ {
+ logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+ nonAnticompacting.add(sstable);
+ sstableIterator.remove();
+ break;
+ }
+ else
+ {
+ logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+ }
}
}
+ cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+ cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+ if (!sstables.isEmpty())
+ doAntiCompaction(cfs, ranges, sstables, repairedAt);
}
- cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
- cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
- if (!sstables.isEmpty())
- doAntiCompaction(cfs, ranges, sstables, repairedAt);
- SSTableReader.releaseReferences(sstables);
- cfs.getDataTracker().unmarkCompacting(sstables);
+ finally
+ {
+ SSTableReader.releaseReferences(sstables);
+ cfs.getDataTracker().unmarkCompacting(sstables);
+ }
+
logger.info(String.format("Completed anticompaction successfully"));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0a34b4a..0024f24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1645,7 +1645,7 @@ public class SSTableReader extends SSTable
}
@VisibleForTesting
- int referenceCount()
+ public int referenceCount()
{
return references.get();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5ed4f4a..090839e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -75,22 +75,30 @@ public class AntiCompactionTest extends SchemaLoader
int nonRepairedKeys = 0;
for (SSTableReader sstable : store.getSSTables())
{
- SSTableScanner scanner = sstable.getScanner();
- while (scanner.hasNext())
+ try (SSTableScanner scanner = sstable.getScanner())
{
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (sstable.isRepaired())
+ while (scanner.hasNext())
{
- assertTrue(range.contains(row.getKey().getToken()));
- repairedKeys++;
- }
- else
- {
- assertFalse(range.contains(row.getKey().getToken()));
- nonRepairedKeys++;
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (sstable.isRepaired())
+ {
+ assertTrue(range.contains(row.getKey().getToken()));
+ repairedKeys++;
+ }
+ else
+ {
+ assertFalse(range.contains(row.getKey().getToken()));
+ nonRepairedKeys++;
+ }
}
}
}
+ for (SSTableReader sstable : store.getSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.referenceCount());
+ }
+ assertEquals(0, store.getDataTracker().getCompacting().size());
assertEquals(repairedKeys, 4);
assertEquals(nonRepairedKeys, 6);
}
@@ -103,7 +111,6 @@ public class AntiCompactionTest extends SchemaLoader
SSTableReader s = writeFile(cfs, 1000);
cfs.addSSTable(s);
long origSize = s.bytesOnDisk();
- System.out.println(cfs.metric.liveDiskSpaceUsed.count());
Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
Collection<SSTableReader> sstables = cfs.getSSTables();
SSTableReader.acquireReferences(sstables);
@@ -146,16 +153,38 @@ public class AntiCompactionTest extends SchemaLoader
List<Range<Token>> ranges = Arrays.asList(range);
SSTableReader.acquireReferences(sstables);
- CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
assertThat(store.getSSTables().size(), is(1));
assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+ assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(store.getDataTracker().getCompacting().size(), is(0));
}
+ @Test
+ public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+ Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
+ SSTableReader.acquireReferences(sstables);
+ CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+
+ assertThat(store.getSSTables().size(), is(1));
+ assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+ assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+ assertThat(store.getDataTracker().getCompacting().size(), is(0));
+ }
+
+
private ColumnFamilyStore prepareColumnFamilyStore()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
store.disableAutoCompaction();
long timestamp = System.currentTimeMillis();
for (int i = 0; i < 10; i++)