You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/02 10:51:28 UTC

[1/2] cassandra git commit: Make sure we release sstable references after anticompaction

Repository: cassandra
Updated Branches:
  refs/heads/trunk 25314c204 -> 06f626acd


Make sure we release sstable references after anticompaction

Patch by marcuse; reviewed by yukim for CASSANDRA-8386


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d15c9187
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d15c9187
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d15c9187

Branch: refs/heads/trunk
Commit: d15c9187a4b66645bf0575a7c3bfdbb9b10a263d
Parents: 9c0f575
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Nov 27 18:12:24 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Dec 2 10:10:33 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 66 +++++++++++---------
 .../cassandra/io/sstable/SSTableReader.java     |  2 +-
 .../db/compaction/AntiCompactionTest.java       | 55 ++++++++++++----
 4 files changed, 82 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d454ba2..7df396d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Release sstable references after anticompaction (CASSANDRA-8386)
  * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
  * Fix high size calculations for prepared statements (CASSANDRA-8231)
  * Centralize shared executors (CASSANDRA-8055)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 61628ff..d85ffd7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -391,6 +391,8 @@ public class CompactionManager implements CompactionManagerMBean
     /**
      * Make sure the {validatedForRepair} are marked for compaction before calling this.
      *
+     * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getAndReferenceSSTables(..)).
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
@@ -407,40 +409,48 @@ public class CompactionManager implements CompactionManagerMBean
         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
-        while (sstableIterator.hasNext())
+        try
         {
-            SSTableReader sstable = sstableIterator.next();
-            for (Range<Token> r : Range.normalize(ranges))
+            while (sstableIterator.hasNext())
             {
-                Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
-                if (r.contains(sstableRange))
+                SSTableReader sstable = sstableIterator.next();
+                for (Range<Token> r : Range.normalize(ranges))
                 {
-                    logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
-                    sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
-                    sstable.reloadSSTableMetadata();
-                    mutatedRepairStatuses.add(sstable);
-                    sstableIterator.remove();
-                    break;
-                }
-                else if (!sstableRange.intersects(r))
-                {
-                    logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
-                    nonAnticompacting.add(sstable);
-                    sstableIterator.remove();
-                    break;
-                }
-                else
-                {
-                    logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+                    Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
+                    if (r.contains(sstableRange))
+                    {
+                        logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+                        sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+                        sstable.reloadSSTableMetadata();
+                        mutatedRepairStatuses.add(sstable);
+                        sstableIterator.remove();
+                        break;
+                    }
+                    else if (!sstableRange.intersects(r))
+                    {
+                        logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+                        nonAnticompacting.add(sstable);
+                        sstableIterator.remove();
+                        break;
+                    }
+                    else
+                    {
+                        logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+                    }
                 }
             }
+            cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+            cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            if (!sstables.isEmpty())
+                doAntiCompaction(cfs, ranges, sstables, repairedAt);
         }
-        cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
-        cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-        if (!sstables.isEmpty())
-            doAntiCompaction(cfs, ranges, sstables, repairedAt);
-        SSTableReader.releaseReferences(sstables);
-        cfs.getDataTracker().unmarkCompacting(sstables);
+        finally
+        {
+            SSTableReader.releaseReferences(sstables);
+            cfs.getDataTracker().unmarkCompacting(sstables);
+        }
+
         logger.info(String.format("Completed anticompaction successfully"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0a34b4a..0024f24 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1645,7 +1645,7 @@ public class SSTableReader extends SSTable
     }
 
     @VisibleForTesting
-    int referenceCount()
+    public int referenceCount()
     {
         return references.get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d15c9187/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 5ed4f4a..090839e 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -75,22 +75,30 @@ public class AntiCompactionTest extends SchemaLoader
         int nonRepairedKeys = 0;
         for (SSTableReader sstable : store.getSSTables())
         {
-            SSTableScanner scanner = sstable.getScanner();
-            while (scanner.hasNext())
+            try (SSTableScanner scanner = sstable.getScanner())
             {
-                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                if (sstable.isRepaired())
+                while (scanner.hasNext())
                 {
-                    assertTrue(range.contains(row.getKey().getToken()));
-                    repairedKeys++;
-                }
-                else
-                {
-                    assertFalse(range.contains(row.getKey().getToken()));
-                    nonRepairedKeys++;
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                    if (sstable.isRepaired())
+                    {
+                        assertTrue(range.contains(row.getKey().getToken()));
+                        repairedKeys++;
+                    }
+                    else
+                    {
+                        assertFalse(range.contains(row.getKey().getToken()));
+                        nonRepairedKeys++;
+                    }
                 }
             }
         }
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.referenceCount());
+        }
+        assertEquals(0, store.getDataTracker().getCompacting().size());
         assertEquals(repairedKeys, 4);
         assertEquals(nonRepairedKeys, 6);
     }
@@ -103,7 +111,6 @@ public class AntiCompactionTest extends SchemaLoader
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
         long origSize = s.bytesOnDisk();
-        System.out.println(cfs.metric.liveDiskSpaceUsed.count());
         Range<Token> range = new Range<Token>(new BytesToken(ByteBufferUtil.bytes(0)), new BytesToken(ByteBufferUtil.bytes(500)));
         Collection<SSTableReader> sstables = cfs.getSSTables();
         SSTableReader.acquireReferences(sstables);
@@ -146,16 +153,38 @@ public class AntiCompactionTest extends SchemaLoader
         List<Range<Token>> ranges = Arrays.asList(range);
 
         SSTableReader.acquireReferences(sstables);
-        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
+        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
 
         assertThat(store.getSSTables().size(), is(1));
         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
+        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+        assertThat(store.getDataTracker().getCompacting().size(), is(0));
     }
 
+    @Test
+    public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        SSTableReader.acquireReferences(sstables);
+        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+
+        assertThat(store.getSSTables().size(), is(1));
+        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+        assertThat(store.getDataTracker().getCompacting().size(), is(0));
+    }
+
+
     private ColumnFamilyStore prepareColumnFamilyStore()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.truncateBlocking();
         store.disableAutoCompaction();
         long timestamp = System.currentTimeMillis();
         for (int i = 0; i < 10; i++)


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06f626ac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06f626ac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06f626ac

Branch: refs/heads/trunk
Commit: 06f626acd27b051222616c0c91f7dd8d556b8d45
Parents: 25314c2 d15c918
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Dec 2 10:49:59 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Dec 2 10:50:18 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 66 +++++++------
 .../db/compaction/AntiCompactionTest.java       | 99 ++++++++++++++------
 3 files changed, 108 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 22cc598,7df396d..141c3a8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,42 -1,7 +1,43 @@@
 +3.0
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 +
 +
  2.1.3
+  * Release sstable references after anticompaction (CASSANDRA-8386)
   * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
 - * Fix high size calculations for prepared statements (CASSANDRA-8231)
   * Centralize shared executors (CASSANDRA-8055)
   * Fix filtering for CONTAINS (KEY) relations on frozen collection
     clustering columns when the query is restricted to a single

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index a9a4773,d85ffd7..ed875b8
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -407,40 -409,48 +409,48 @@@ public class CompactionManager implemen
          Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
          Set<SSTableReader> nonAnticompacting = new HashSet<>();
          Iterator<SSTableReader> sstableIterator = sstables.iterator();
-         while (sstableIterator.hasNext())
+         try
          {
-             SSTableReader sstable = sstableIterator.next();
-             for (Range<Token> r : Range.normalize(ranges))
+             while (sstableIterator.hasNext())
              {
-                 Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken());
-                 if (r.contains(sstableRange))
-                 {
-                     logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
-                     sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
-                     sstable.reloadSSTableMetadata();
-                     mutatedRepairStatuses.add(sstable);
-                     sstableIterator.remove();
-                     break;
-                 }
-                 else if (!sstableRange.intersects(r))
+                 SSTableReader sstable = sstableIterator.next();
+                 for (Range<Token> r : Range.normalize(ranges))
                  {
-                     logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
-                     nonAnticompacting.add(sstable);
-                     sstableIterator.remove();
-                     break;
-                 }
-                 else
-                 {
-                     logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
 -                    Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken(), sstable.partitioner);
++                    Range<Token> sstableRange = new Range<>(sstable.first.getToken(), sstable.last.getToken());
+                     if (r.contains(sstableRange))
+                     {
+                         logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
+                         sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
+                         sstable.reloadSSTableMetadata();
+                         mutatedRepairStatuses.add(sstable);
+                         sstableIterator.remove();
+                         break;
+                     }
+                     else if (!sstableRange.intersects(r))
+                     {
+                         logger.info("SSTable {} ({}) does not intersect repaired range {}, not touching repairedAt.", sstable, sstableRange, r);
+                         nonAnticompacting.add(sstable);
+                         sstableIterator.remove();
+                         break;
+                     }
+                     else
+                     {
+                         logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableRange, r);
+                     }
                  }
              }
+             cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
+             cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+             SSTableReader.releaseReferences(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+             if (!sstables.isEmpty())
+                 doAntiCompaction(cfs, ranges, sstables, repairedAt);
          }
-         cfs.getDataTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
-         cfs.getDataTracker().unmarkCompacting(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-         if (!sstables.isEmpty())
-             doAntiCompaction(cfs, ranges, sstables, repairedAt);
-         SSTableReader.releaseReferences(sstables);
-         cfs.getDataTracker().unmarkCompacting(sstables);
+         finally
+         {
+             SSTableReader.releaseReferences(sstables);
+             cfs.getDataTracker().unmarkCompacting(sstables);
+         }
+ 
          logger.info(String.format("Completed anticompaction successfully"));
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/06f626ac/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 87e4315,090839e..2396acb
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -56,49 -50,17 +56,34 @@@ import org.apache.cassandra.utils.ByteB
  
  import com.google.common.collect.Iterables;
  
 -public class AntiCompactionTest extends SchemaLoader
 +public class AntiCompactionTest
  {
 -    private static final String KEYSPACE1 = "Keyspace1";
 +    private static final String KEYSPACE1 = "AntiCompactionTest";
      private static final String CF = "Standard1";
  
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                SimpleStrategy.class,
 +                KSMetaData.optsWithRF(1),
 +                SchemaLoader.standardCFMD(KEYSPACE1, CF));
 +    }
 +
 +    @After
 +    public void truncateCF()
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.truncateBlocking();
 +    }
 +
      @Test
--    public void antiCompactOne() throws InterruptedException, ExecutionException, IOException
++    public void antiCompactOne() throws Exception
      {
-         Keyspace keyspace = Keyspace.open(KEYSPACE1);
-         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
-         store.disableAutoCompaction();
-         long timestamp = System.currentTimeMillis();
-         for (int i = 0; i < 10; i++)
-         {
-             DecoratedKey key = Util.dk(Integer.toString(i));
-             Mutation rm = new Mutation(KEYSPACE1, key.getKey());
-             for (int j = 0; j < 10; j++)
-                 rm.add(CF, Util.cellname(Integer.toString(j)),
-                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                        timestamp,
-                        0);
-             rm.applyUnsafe();
-         }
-         store.forceBlockingFlush();
+         ColumnFamilyStore store = prepareColumnFamilyStore();
          Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
          assertEquals(store.getSSTables().size(), sstables.size());
          Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("4".getBytes()));
@@@ -113,19 -75,21 +98,21 @@@
          int nonRepairedKeys = 0;
          for (SSTableReader sstable : store.getSSTables())
          {
-             ICompactionScanner scanner = sstable.getScanner();
-             while (scanner.hasNext())
 -            try (SSTableScanner scanner = sstable.getScanner())
++            try (ICompactionScanner scanner = sstable.getScanner())
              {
-                 SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                 if (sstable.isRepaired())
-                 {
-                     assertTrue(range.contains(row.getKey().getToken()));
-                     repairedKeys++;
-                 }
-                 else
+                 while (scanner.hasNext())
                  {
-                     assertFalse(range.contains(row.getKey().getToken()));
-                     nonRepairedKeys++;
+                     SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                     if (sstable.isRepaired())
+                     {
+                         assertTrue(range.contains(row.getKey().getToken()));
+                         repairedKeys++;
+                     }
+                     else
+                     {
+                         assertFalse(range.contains(row.getKey().getToken()));
+                         nonRepairedKeys++;
+                     }
                  }
              }
          }
@@@ -163,7 -131,12 +155,7 @@@
          File dir = cfs.directories.getDirectoryForNewSSTables();
          String filename = cfs.getTempSSTablePath(dir);
  
-         SSTableWriter writer = SSTableWriter.create(filename,0,0);
 -        SSTableWriter writer = new SSTableWriter(filename,
 -                0,
 -                0,
 -                cfs.metadata,
 -                StorageService.getPartitioner(),
 -                new MetadataCollector(cfs.metadata.comparator));
++        SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
  
          for (int i = 0; i < count * 5; i++)
              writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
@@@ -215,60 -153,60 +207,107 @@@
          List<Range<Token>> ranges = Arrays.asList(range);
  
          SSTableReader.acquireReferences(sstables);
 -        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
 -
 -        assertThat(store.getSSTables().size(), is(1));
 -        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 -        assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
 -        assertThat(store.getDataTracker().getCompacting().size(), is(0));
 +        long repairedAt = 1000;
 +        CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
 +        /*
 +        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
 +        so there will be no net change in the number of sstables
 +         */
 +        assertEquals(10, store.getSSTables().size());
 +        int repairedKeys = 0;
 +        int nonRepairedKeys = 0;
 +        for (SSTableReader sstable : store.getSSTables())
 +        {
 +            ICompactionScanner scanner = sstable.getScanner();
 +            while (scanner.hasNext())
 +            {
 +                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
 +                if (sstable.isRepaired())
 +                {
 +                    assertTrue(range.contains(row.getKey().getToken()));
 +                    assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
 +                    repairedKeys++;
 +                }
 +                else
 +                {
 +                    assertFalse(range.contains(row.getKey().getToken()));
 +                    assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
 +                    nonRepairedKeys++;
 +                }
 +            }
 +        }
 +        assertEquals(repairedKeys, 40);
 +        assertEquals(nonRepairedKeys, 60);
      }
 -
      @Test
+     public void shouldMutateRepairedAt() throws InterruptedException, ExecutionException, IOException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
+         Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+         assertEquals(store.getSSTables().size(), sstables.size());
+         Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()), new BytesToken("9999".getBytes()));
+         List<Range<Token>> ranges = Arrays.asList(range);
+ 
+         SSTableReader.acquireReferences(sstables);
+         CompactionManager.instance.performAnticompaction(store, ranges, sstables, 1);
+ 
+         assertThat(store.getSSTables().size(), is(1));
+         assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
+         assertThat(Iterables.get(store.getSSTables(), 0).referenceCount(), is(1));
+         assertThat(store.getDataTracker().getCompacting().size(), is(0));
+     }
+ 
+ 
++    @Test
 +    public void shouldSkipAntiCompactionForNonIntersectingRange() throws InterruptedException, ExecutionException, IOException
 +    {
 +        Keyspace keyspace = Keyspace.open(KEYSPACE1);
 +        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 +        store.disableAutoCompaction();
 +
 +        for (int table = 0; table < 10; table++)
 +        {
 +            generateSStable(store,Integer.toString(table));
 +        }
 +        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 +        assertEquals(store.getSSTables().size(), sstables.size());
 +        
 +        Range<Token> range = new Range<Token>(new BytesToken("-10".getBytes()), new BytesToken("-1".getBytes()));
 +        List<Range<Token>> ranges = Arrays.asList(range);
 +
 +        SSTableReader.acquireReferences(sstables);
 +        CompactionManager.instance.performAnticompaction(store, ranges, sstables, 0);
 +
 +        assertThat(store.getSSTables().size(), is(10));
 +        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(false));
 +    }
 +
+     private ColumnFamilyStore prepareColumnFamilyStore()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
 -        store.truncateBlocking();
+         store.disableAutoCompaction();
+         long timestamp = System.currentTimeMillis();
+         for (int i = 0; i < 10; i++)
+         {
+             DecoratedKey key = Util.dk(Integer.toString(i));
+             Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+             for (int j = 0; j < 10; j++)
+                 rm.add("Standard1", Util.cellname(Integer.toString(j)),
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        timestamp,
+                        0);
+             rm.apply();
+         }
+         store.forceBlockingFlush();
+         return store;
+     }
 -    
++
+     @After
 -    public void truncateCF()
++    public void truncateCfs()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE1);
+         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+         store.truncateBlocking();
+     }
  }