You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/02 10:51:14 UTC

cassandra git commit: Make sure we release sstable references after anticompaction

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 9c0f5753f -> d15c9187a


Make sure we release sstable references after anticompaction

Patch by marcuse; reviewed by yukim for CASSANDRA-8386


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d15c9187
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d15c9187
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d15c9187

Branch: refs/heads/cassandra-2.1
Commit: d15c9187a4b66645bf0575a7c3bfdbb9b10a263d
Parents: 9c0f575
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Nov 27 18:12:24 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Dec 2 10:10:33 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 66 +++++++++++---------
 .../cassandra/io/sstable/SSTableReader.java     |  2 +-
 .../db/compaction/AntiCompactionTest.java       | 55 ++++++++++++----
 4 files changed, 82 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d454ba2..7df396d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Release sstable references after anticompaction (CASSANDRA-8386)
  * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
  * Fix high size calculations for prepared statements (CASSANDRA-8231)
  * Centralize shared executors (CASSANDRA-8055)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 61628ff..d85ffd7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -391,6 +391,8 @@ public class CompactionManager implements CompactionManagerMBean
     /**
      * Make sure the {validatedForRepair} are marked for compaction before calling this.
      *
+     * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)).
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
@@ -407,40 +409,48 @@ public class CompactionManager implements CompactionManagerMBean
         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
-        while (sstableIterator.hasNext())
+        try
         {
-            SSTableReader sstable = sstableIterator.next();
-            for (Range<Token> r : Range.normalize(ranges))
+            while (sstableIterator.hasNext())
             {
-                Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
-                if (r.contains(sstableRange))
+                SSTableReader sstable = sstableIterator.next();
+                for (Range<Token> r : Range.normalize(ranges))
                 {
-                    logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
-                    sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
-                    sstable.reloadSSTableMetadata();
-                    mutatedRepairStatuses.add(sstable);
-                    sstableIterator.remove();
-                    break;
-                }
-                else if (!sstableRange.intersects(r))
-                {
-                    logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
-                    nonAnticompacting.add(sstable);
-                    sstableIterator.remove();
-                    break;
-                }
-                else
-                {
-                    logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+                    Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
+                    if (r.contains(sstableRange))
+                    {
+                        logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+                        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+                        sstable.reloadSSTableMetadata();
+                        mutatedRepairStatuses.add(sstable);
+                        sstableIterator.remove();
+                        break;
+                    }
+                    else if (!sstableRange.intersects(r))
+                    {
+                        logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+                        nonAnticompacting.add(sstable);
+                        sstableIterator.remove();
+                        break;
+                    }
+                    else
+                    {
+                        logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+                    }
                 }
             }
+            cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+            cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            if (!sstables.isEmpty())
+                doAntiCompaction(cfs, ranges, sstables, repairedAt);
         }
-        cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
-        cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-        if (!sstables.isEmpty())
-            doAntiCompaction(cfs, ranges, sstables, repairedAt);
-        SSTableReader.releaseReferences(sstables);
-        cfs.getDataTracker().unmarkCompacting(sstables);
+        finally
+        {
+            SSTableReader.releaseReferences(sstables);
+            cfs.getDataTracker().unmarkCompacting(sstables);
+        }
+
         logger.info(String.format("Completed anticompaction successfully"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0a34b4a..0024f24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1645,7 +1645,7 @@ public class SSTableReader extends SSTable
     }
 
     @VisibleForTesting
-    int referenceCount()
+    public int referenceCount()
     {
         return references.get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5ed4f4a..090839e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -75,22 +75,30 @@ public class AntiCompactionTest extends SchemaLoader
         int nonRepairedKeys = 0;
         for (SSTableReader sstable : store.getSSTables())
         {
-            SSTableScanner scanner = sstable.getScanner();
-            while (scanner.hasNext())
+            try (SSTableScanner scanner = sstable.getScanner())
             {
-                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                if (sstable.isRepaired())
+                while (scanner.hasNext())
                 {
-                    assertTrue(range.contains(row.getKey().getToken()));
-                    repairedKeys++;
-                }
-                else
-                {
-                    assertFalse(range.contains(row.getKey().getToken()));
-                    nonRepairedKeys++;
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                    if (sstable.isRepaired())
+                    {
+                        assertTrue(range.contains(row.getKey().getToken()));
+                        repairedKeys++;
+                    }
+                    else
+                    {
+                        assertFalse(range.contains(row.getKey().getToken()));
+                        nonRepairedKeys++;
+                    }
                 }
             }
         }
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.referenceCount());
+        }
+        assertEquals(0, store.getDataTracker().getCompacting().size());
         assertEquals(repairedKeys, 4);
         assertEquals(nonRepairedKeys, 6);
     }
@@ -103,7 +111,6 @@ public class AntiCompactionTest extends SchemaLoader
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
         long origSize = s.bytesOnDisk();
-        System.out.println(cfs.metric.liveDiskSpaceUsed.count());
         Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
         Collection<SSTableReader> sstables = cfs.getSSTables();
         SSTableReader.acquireReferences(sstables);
@@ -146,16 +153,38 @@ public class AntiCompactionTest extends SchemaLoader
         List<Range<Token>> ranges = Arrays.asList(range);
 
         SSTableReader.acquireReferences(sstables);
-        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
+        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
 
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+        assertThat(store.getDataTracker().getCompacting().size(), is(0));
     }
 
+    @Test
+    public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        SSTableReader.acquireReferences(sstables);
+        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+
+        assertThat(store.getSSTables().size(), is(1));
+        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+        assertThat(store.getDataTracker().getCompacting().size(), is(0));
+    }
+
+
     private ColumnFamilyStore prepareColumnFamilyStore()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.truncateBlocking();
         store.disableAutoCompaction();
         long timestamp = System.currentTimeMillis();
         for (int i = 0; i < 10; i++)