You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2018/06/29 07:57:46 UTC

[01/10] cassandra git commit: Stop SSTables being lost from compaction strategy after full repairs

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 1143bc113 -> f8912ce93
  refs/heads/cassandra-3.0 4e23c9e4d -> 489c2f695
  refs/heads/cassandra-3.11 ea62d8862 -> bba0d03e9
  refs/heads/trunk 4cb83cb81 -> 5cc68a873


Stop SSTables being lost from compaction strategy after full repairs

patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for CASSANDRA-14423


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9

Branch: refs/heads/cassandra-2.2
Commit: f8912ce9329a8bc360e93cf61e56814135fbab39
Parents: 1143bc1
Author: kurt <ku...@instaclustr.com>
Authored: Thu Jun 14 10:59:19 2018 +0000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:49:53 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  70 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 120 ++++++++++++++++++-
 3 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b1089e..9d6a9ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.13
+ * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
  * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
  * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 419f66e..013fc04 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -460,6 +460,16 @@ public class CompactionManager implements CompactionManagerMBean
         }, jobs, OperationType.CLEANUP);
     }
 
+    /**
+     * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+     * as repaired.
+     *
+     * @param cfs Column family for anti-compaction
+     * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+     * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+     * @param repairedAt Unix timestamp of when repair was completed.
+     * @return Futures executing anti-compaction.
+     */
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
@@ -475,6 +485,8 @@ public class CompactionManager implements CompactionManagerMBean
                 {
                     for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
                         sstables.releaseIfHolds(compactingSSTable);
+                    // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted
+                    // with unrepaired data.
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
                         if (sstable.isMarkedCompacted())
@@ -504,9 +516,17 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
+     * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
+     * Although anti-compaction could work on repaired sstables as well and would result in having more accurate
+     * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't
+     * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will
+     * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require
+     * anticompaction.
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+     * @param txn Transaction across all SSTables that were repaired.
      * @throws InterruptedException
      * @throws IOException
      */
@@ -519,13 +539,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
         logger.trace("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-        // we should only notify that repair status changed if it actually did:
-        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-            wasRepairedBefore.put(sstable, sstable.isRepaired());
-
+        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@ -536,6 +550,11 @@ public class CompactionManager implements CompactionManagerMBean
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
+                List<String> anticompactRanges = new ArrayList<>();
+                // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153
+                // and CASSANDRA-14423.
+                if (sstable.isRepaired()) // We never anti-compact already repaired SSTables
+                    nonAnticompacting.add(sstable);
 
                 Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
 
@@ -548,28 +567,30 @@ public class CompactionManager implements CompactionManagerMBean
                         logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
                         sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                         sstable.reloadSSTableMetadata();
-                        mutatedRepairStatuses.add(sstable);
-                        if (!wasRepairedBefore.get(sstable))
-                            mutatedRepairStatusToNotify.add(sstable);
+                        if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                            mutatedRepairStatuses.add(sstable);
                         sstableIterator.remove();
                         shouldAnticompact = true;
                         break;
                     }
-                    else if (r.intersects(sstableBounds))
+                    else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable))
                     {
-                        logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableBounds, r);
+                        anticompactRanges.add(r.toString());
                         shouldAnticompact = true;
                     }
                 }
 
+                if (!anticompactRanges.isEmpty())
+                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
+
                 if (!shouldAnticompact)
                 {
-                    logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
+                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
                     nonAnticompacting.add(sstable);
                     sstableIterator.remove();
                 }
             }
-            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
+            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
             txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
@@ -1223,24 +1244,11 @@ public class CompactionManager implements CompactionManagerMBean
      */
     private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
     {
-        logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+        int numAnticompact = repaired.originals().size();
+        logger.info("Performing anticompaction on {} sstables", numAnticompact);
 
         //Group SSTables
-        Set<SSTableReader> sstables = repaired.originals();
-
-        // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-        // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-        // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-        // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-        Set<SSTableReader> unrepairedSSTables = ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader input)
-            {
-                return !input.isRepaired();
-            }
-        }));
-
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables);
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1253,7 +1261,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
+        logger.info(format, numAnticompact, antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index c451516..abd9a4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,13 +29,20 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.locator.SimpleStrategy;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Test;
@@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import com.google.common.collect.Iterables;
-
 public class AntiCompactionTest
 {
     private static final String KEYSPACE1 = "AntiCompactionTest";
@@ -271,9 +276,116 @@ public class AntiCompactionTest
             CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
         }
 
+        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
         assertThat(store.getSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
-        assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+    }
+
+    @Test
+    public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+        // one repaired and one unrepaired SSTable
+        Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+        }
+
+        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
+            }
+        };
+
+        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
+        sstablesSorted.addAll(store.getSSTables());
+
+        SSTableReader sstable = sstablesSorted.first();
+        assertThat(store.getSSTables().size(), is(2));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
+        sstables = store.getSSTables();
+        // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+        ranges = Arrays.asList(range);
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // use different repairedAt to ensure it doesn't change
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+        // but leave the unrepaired SSTable as is.
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Same repaired at, but should be changed on the repaired SSTable now
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+        // mark unrepaired SSTable as repaired
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Both SSTables should have repairedAt of 400
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(true));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by mc...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bba0d03e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bba0d03e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bba0d03e

Branch: refs/heads/trunk
Commit: bba0d03e9c5e62c222734839a9adc83f1aec6f95
Parents: ea62d88 489c2f6
Author: Mick Semb Wever <mc...@apache.org>
Authored: Fri Jun 29 16:58:26 2018 +1000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 17:00:02 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  67 +++++++-----
 .../db/compaction/AntiCompactionTest.java       | 109 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f0a4de5,f033bf2..fa6b03e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -478,115 -474,17 +477,126 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
 +    public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
 +    {
 +        assert !cfStore.isIndex();
 +
 +        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Iterable<SSTableReader> originals = transaction.originals();
 +                if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
 +                    originals = Iterables.filter(originals, SSTableReader::isRepaired);
 +                List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
 +                Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator);
 +                return sortedSSTables;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn) throws IOException
 +            {
 +                logger.debug("Garbage collecting {}", txn.originals());
 +                CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
 +                {
 +                    @Override
 +                    protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
 +                    {
 +                        return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption);
 +                    }
 +                };
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.GARBAGE_COLLECT);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.GARBAGE_COLLECT);
 +    }
 +
 +    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
 +        {
 +            logger.info("Partitioner does not support splitting");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +        final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 +
 +        if (r.isEmpty())
 +        {
 +            logger.info("Relocate cannot run before a node has joined the ring");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +
 +        final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
 +
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
 +                Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
 +                transaction.cancel(Sets.difference(originals, needsRelocation));
 +
 +                Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation);
 +
 +                int maxSize = 0;
 +                for (List<SSTableReader> diskSSTables : groupedByDisk.values())
 +                    maxSize = Math.max(maxSize, diskSSTables.size());
 +
 +                List<SSTableReader> mixedSSTables = new ArrayList<>();
 +
 +                for (int i = 0; i < maxSize; i++)
 +                    for (List<SSTableReader> diskSSTables : groupedByDisk.values())
 +                        if (i < diskSSTables.size())
 +                            mixedSSTables.add(diskSSTables.get(i));
 +
 +                return mixedSSTables;
 +            }
 +
 +            public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation)
 +            {
 +                return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s)));
 +            }
 +
 +            private boolean inCorrectLocation(SSTableReader sstable)
 +            {
 +                if (!cfs.getPartitioner().splitter().isPresent())
 +                    return true;
 +
 +                int diskIndex = diskBoundaries.getDiskIndex(sstable);
 +                File diskLocation = diskBoundaries.directories.get(diskIndex).location;
 +                PartitionPosition diskLast = diskBoundaries.positions.get(diskIndex);
 +
 +                // the location we get from directoryIndex is based on the first key in the sstable
 +                // now we need to make sure the last key is less than the boundary as well:
 +                return sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn)
 +            {
 +                logger.debug("Relocating {}", txn.originals());
 +                AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.RELOCATE);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.RELOCATE);
 +    }
 +
+     /**
+      * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+      * as repaired.
+      *
+      * @param cfs Column family for anti-compaction
+      * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+      * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+      * @param repairedAt Unix timestamp of when repair was completed.
+      * @param parentRepairSession Corresponding repair session
+      * @return Futures executing anti-compaction.
+      */
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                            final Collection<Range<Token>> ranges,
                                            final Refs<SSTableReader> sstables,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[04/10] cassandra git commit: Stop SSTables being lost from compaction strategy after full repairs

Posted by mc...@apache.org.
Stop SSTables being lost from compaction strategy after full repairs

patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for CASSANDRA-14423


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9

Branch: refs/heads/trunk
Commit: f8912ce9329a8bc360e93cf61e56814135fbab39
Parents: 1143bc1
Author: kurt <ku...@instaclustr.com>
Authored: Thu Jun 14 10:59:19 2018 +0000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:49:53 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  70 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 120 ++++++++++++++++++-
 3 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b1089e..9d6a9ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.13
+ * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
  * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
  * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 419f66e..013fc04 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -460,6 +460,16 @@ public class CompactionManager implements CompactionManagerMBean
         }, jobs, OperationType.CLEANUP);
     }
 
+    /**
+     * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+     * as repaired.
+     *
+     * @param cfs Column family for anti-compaction
+     * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+     * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+     * @param repairedAt Unix timestamp of when repair was completed.
+     * @return Futures executing anti-compaction.
+     */
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
@@ -475,6 +485,8 @@ public class CompactionManager implements CompactionManagerMBean
                 {
                     for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
                         sstables.releaseIfHolds(compactingSSTable);
+                    // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted
+                    // with unrepaired data.
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
                         if (sstable.isMarkedCompacted())
@@ -504,9 +516,17 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
+     * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
+     * Although anti-compaction could work on repaired sstables as well and would result in having more accurate
+     * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't
+     * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will
+     * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require
+     * anticompaction.
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+     * @param txn Transaction across all SSTables that were repaired.
      * @throws InterruptedException
      * @throws IOException
      */
@@ -519,13 +539,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
         logger.trace("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-        // we should only notify that repair status changed if it actually did:
-        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-            wasRepairedBefore.put(sstable, sstable.isRepaired());
-
+        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@ -536,6 +550,11 @@ public class CompactionManager implements CompactionManagerMBean
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
+                List<String> anticompactRanges = new ArrayList<>();
+                // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153
+                // and CASSANDRA-14423.
+                if (sstable.isRepaired()) // We never anti-compact already repaired SSTables
+                    nonAnticompacting.add(sstable);
 
                 Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
 
@@ -548,28 +567,30 @@ public class CompactionManager implements CompactionManagerMBean
                         logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
                         sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                         sstable.reloadSSTableMetadata();
-                        mutatedRepairStatuses.add(sstable);
-                        if (!wasRepairedBefore.get(sstable))
-                            mutatedRepairStatusToNotify.add(sstable);
+                        if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                            mutatedRepairStatuses.add(sstable);
                         sstableIterator.remove();
                         shouldAnticompact = true;
                         break;
                     }
-                    else if (r.intersects(sstableBounds))
+                    else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable))
                     {
-                        logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableBounds, r);
+                        anticompactRanges.add(r.toString());
                         shouldAnticompact = true;
                     }
                 }
 
+                if (!anticompactRanges.isEmpty())
+                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
+
                 if (!shouldAnticompact)
                 {
-                    logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
+                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
                     nonAnticompacting.add(sstable);
                     sstableIterator.remove();
                 }
             }
-            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
+            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
             txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
@@ -1223,24 +1244,11 @@ public class CompactionManager implements CompactionManagerMBean
      */
     private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
     {
-        logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+        int numAnticompact = repaired.originals().size();
+        logger.info("Performing anticompaction on {} sstables", numAnticompact);
 
         //Group SSTables
-        Set<SSTableReader> sstables = repaired.originals();
-
-        // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-        // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-        // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-        // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-        Set<SSTableReader> unrepairedSSTables = ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader input)
-            {
-                return !input.isRepaired();
-            }
-        }));
-
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables);
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1253,7 +1261,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
+        logger.info(format, numAnticompact, antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index c451516..abd9a4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,13 +29,20 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.locator.SimpleStrategy;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Test;
@@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import com.google.common.collect.Iterables;
-
 public class AntiCompactionTest
 {
     private static final String KEYSPACE1 = "AntiCompactionTest";
@@ -271,9 +276,116 @@ public class AntiCompactionTest
             CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
         }
 
+        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
         assertThat(store.getSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
-        assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+    }
+
+    @Test
+    public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+        // one repaired and one unrepaired SSTable
+        Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+        }
+
+        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
+            }
+        };
+
+        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
+        sstablesSorted.addAll(store.getSSTables());
+
+        SSTableReader sstable = sstablesSorted.first();
+        assertThat(store.getSSTables().size(), is(2));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
+        sstables = store.getSSTables();
+        // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+        ranges = Arrays.asList(range);
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // use different repairedAt to ensure it doesn't change
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+        // but leave the unrepaired SSTable as is.
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Same repaired at, but should be changed on the repaired SSTable now
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+        // mark unrepaired SSTable as repaired
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Both SSTables should have repairedAt of 400
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(true));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[02/10] cassandra git commit: Stop SSTables being lost from compaction strategy after full repairs

Posted by mc...@apache.org.
Stop SSTables being lost from compaction strategy after full repairs

patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for CASSANDRA-14423


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9

Branch: refs/heads/cassandra-3.0
Commit: f8912ce9329a8bc360e93cf61e56814135fbab39
Parents: 1143bc1
Author: kurt <ku...@instaclustr.com>
Authored: Thu Jun 14 10:59:19 2018 +0000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:49:53 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  70 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 120 ++++++++++++++++++-
 3 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b1089e..9d6a9ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.13
+ * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
  * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
  * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 419f66e..013fc04 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -460,6 +460,16 @@ public class CompactionManager implements CompactionManagerMBean
         }, jobs, OperationType.CLEANUP);
     }
 
+    /**
+     * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+     * as repaired.
+     *
+     * @param cfs Column family for anti-compaction
+     * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+     * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+     * @param repairedAt Unix timestamp of when repair was completed.
+     * @return Futures executing anti-compaction.
+     */
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
@@ -475,6 +485,8 @@ public class CompactionManager implements CompactionManagerMBean
                 {
                     for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
                         sstables.releaseIfHolds(compactingSSTable);
+                    // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted
+                    // with unrepaired data.
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
                         if (sstable.isMarkedCompacted())
@@ -504,9 +516,17 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
+     * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
+     * Although anti-compaction could work on repaired sstables as well and would result in having more accurate
+     * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't
+     * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will
+     * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require
+     * anticompaction.
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+     * @param txn Transaction across all SSTables that were repaired.
      * @throws InterruptedException
      * @throws IOException
      */
@@ -519,13 +539,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
         logger.trace("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-        // we should only notify that repair status changed if it actually did:
-        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-            wasRepairedBefore.put(sstable, sstable.isRepaired());
-
+        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@ -536,6 +550,11 @@ public class CompactionManager implements CompactionManagerMBean
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
+                List<String> anticompactRanges = new ArrayList<>();
+                // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153
+                // and CASSANDRA-14423.
+                if (sstable.isRepaired()) // We never anti-compact already repaired SSTables
+                    nonAnticompacting.add(sstable);
 
                 Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
 
@@ -548,28 +567,30 @@ public class CompactionManager implements CompactionManagerMBean
                         logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
                         sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                         sstable.reloadSSTableMetadata();
-                        mutatedRepairStatuses.add(sstable);
-                        if (!wasRepairedBefore.get(sstable))
-                            mutatedRepairStatusToNotify.add(sstable);
+                        if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                            mutatedRepairStatuses.add(sstable);
                         sstableIterator.remove();
                         shouldAnticompact = true;
                         break;
                     }
-                    else if (r.intersects(sstableBounds))
+                    else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable))
                     {
-                        logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableBounds, r);
+                        anticompactRanges.add(r.toString());
                         shouldAnticompact = true;
                     }
                 }
 
+                if (!anticompactRanges.isEmpty())
+                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
+
                 if (!shouldAnticompact)
                 {
-                    logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
+                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
                     nonAnticompacting.add(sstable);
                     sstableIterator.remove();
                 }
             }
-            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
+            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
             txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
@@ -1223,24 +1244,11 @@ public class CompactionManager implements CompactionManagerMBean
      */
     private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
     {
-        logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+        int numAnticompact = repaired.originals().size();
+        logger.info("Performing anticompaction on {} sstables", numAnticompact);
 
         //Group SSTables
-        Set<SSTableReader> sstables = repaired.originals();
-
-        // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-        // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-        // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-        // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-        Set<SSTableReader> unrepairedSSTables = ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader input)
-            {
-                return !input.isRepaired();
-            }
-        }));
-
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables);
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1253,7 +1261,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
+        logger.info(format, numAnticompact, antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index c451516..abd9a4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,13 +29,20 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.locator.SimpleStrategy;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Test;
@@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import com.google.common.collect.Iterables;
-
 public class AntiCompactionTest
 {
     private static final String KEYSPACE1 = "AntiCompactionTest";
@@ -271,9 +276,116 @@ public class AntiCompactionTest
             CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
         }
 
+        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
         assertThat(store.getSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
-        assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+    }
+
+    @Test
+    public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+        // one repaired and one unrepaired SSTable
+        Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+        }
+
+        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
+            }
+        };
+
+        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
+        sstablesSorted.addAll(store.getSSTables());
+
+        SSTableReader sstable = sstablesSorted.first();
+        assertThat(store.getSSTables().size(), is(2));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
+        sstables = store.getSSTables();
+        // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+        ranges = Arrays.asList(range);
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // use different repairedAt to ensure it doesn't change
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+        // but leave the unrepaired SSTable as is.
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Same repaired at, but should be changed on the repaired SSTable now
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+        // mark unrepaired SSTable as repaired
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Both SSTables should have repairedAt of 400
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(true));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[03/10] cassandra git commit: Stop SSTables being lost from compaction strategy after full repairs

Posted by mc...@apache.org.
Stop SSTables being lost from compaction strategy after full repairs

patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for CASSANDRA-14423


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9

Branch: refs/heads/cassandra-3.11
Commit: f8912ce9329a8bc360e93cf61e56814135fbab39
Parents: 1143bc1
Author: kurt <ku...@instaclustr.com>
Authored: Thu Jun 14 10:59:19 2018 +0000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:49:53 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  70 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 120 ++++++++++++++++++-
 3 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b1089e..9d6a9ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.13
+ * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
  * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
  * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
  * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 419f66e..013fc04 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -460,6 +460,16 @@ public class CompactionManager implements CompactionManagerMBean
         }, jobs, OperationType.CLEANUP);
     }
 
+    /**
+     * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+     * as repaired.
+     *
+     * @param cfs Column family for anti-compaction
+     * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+     * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+     * @param repairedAt Unix timestamp of when repair was completed.
+     * @return Futures executing anti-compaction.
+     */
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
@@ -475,6 +485,8 @@ public class CompactionManager implements CompactionManagerMBean
                 {
                     for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
                         sstables.releaseIfHolds(compactingSSTable);
+                    // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted
+                    // with unrepaired data.
                     Set<SSTableReader> compactedSSTables = new HashSet<>();
                     for (SSTableReader sstable : sstables)
                         if (sstable.isMarkedCompacted())
@@ -504,9 +516,17 @@ public class CompactionManager implements CompactionManagerMBean
      *
      * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
      *
+     * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
+     * Although anti-compaction could work on repaired sstables as well and would result in having more accurate
+     * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't
+     * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will
+     * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require
+     * anticompaction.
+     *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
      * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+     * @param txn Transaction across all SSTables that were repaired.
      * @throws InterruptedException
      * @throws IOException
      */
@@ -519,13 +539,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
         logger.trace("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-        // we should only notify that repair status changed if it actually did:
-        Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-        Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-        for (SSTableReader sstable : sstables)
-            wasRepairedBefore.put(sstable, sstable.isRepaired());
-
+        Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
         Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
         Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@ -536,6 +550,11 @@ public class CompactionManager implements CompactionManagerMBean
             while (sstableIterator.hasNext())
             {
                 SSTableReader sstable = sstableIterator.next();
+                List<String> anticompactRanges = new ArrayList<>();
+                // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153
+                // and CASSANDRA-14423.
+                if (sstable.isRepaired()) // We never anti-compact already repaired SSTables
+                    nonAnticompacting.add(sstable);
 
                 Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
 
@@ -548,28 +567,30 @@ public class CompactionManager implements CompactionManagerMBean
                         logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
                         sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                         sstable.reloadSSTableMetadata();
-                        mutatedRepairStatuses.add(sstable);
-                        if (!wasRepairedBefore.get(sstable))
-                            mutatedRepairStatusToNotify.add(sstable);
+                        if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                            mutatedRepairStatuses.add(sstable);
                         sstableIterator.remove();
                         shouldAnticompact = true;
                         break;
                     }
-                    else if (r.intersects(sstableBounds))
+                    else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable))
                     {
-                        logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableBounds, r);
+                        anticompactRanges.add(r.toString());
                         shouldAnticompact = true;
                     }
                 }
 
+                if (!anticompactRanges.isEmpty())
+                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
+
                 if (!shouldAnticompact)
                 {
-                    logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
+                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
                     nonAnticompacting.add(sstable);
                     sstableIterator.remove();
                 }
             }
-            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
+            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
             txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
             assert txn.originals().equals(sstables);
@@ -1223,24 +1244,11 @@ public class CompactionManager implements CompactionManagerMBean
      */
     private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
     {
-        logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+        int numAnticompact = repaired.originals().size();
+        logger.info("Performing anticompaction on {} sstables", numAnticompact);
 
         //Group SSTables
-        Set<SSTableReader> sstables = repaired.originals();
-
-        // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-        // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-        // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-        // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-        Set<SSTableReader> unrepairedSSTables = ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>()
-        {
-            public boolean apply(SSTableReader input)
-            {
-                return !input.isRepaired();
-            }
-        }));
-
-        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables);
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
         int antiCompactedSSTableCount = 0;
         for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1253,7 +1261,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
 
         String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
-        logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
+        logger.info(format, numAnticompact, antiCompactedSSTableCount);
     }
 
     private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index c451516..abd9a4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,13 +29,20 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.locator.SimpleStrategy;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.BeforeClass;
 import org.junit.After;
 import org.junit.Test;
@@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import com.google.common.collect.Iterables;
-
 public class AntiCompactionTest
 {
     private static final String KEYSPACE1 = "AntiCompactionTest";
@@ -271,9 +276,116 @@ public class AntiCompactionTest
             CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
         }
 
+        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
         assertThat(store.getSSTables().size(), is(1));
-        assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
-        assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+    }
+
+    @Test
+    public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+        // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+        // one repaired and one unrepaired SSTable
+        Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+             Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+        }
+
+        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
+            }
+        };
+
+        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
+        sstablesSorted.addAll(store.getSSTables());
+
+        SSTableReader sstable = sstablesSorted.first();
+        assertThat(store.getSSTables().size(), is(2));
+        assertThat(sstable.isRepaired(), is(true));
+        assertThat(sstable.selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
+        sstables = store.getSSTables();
+        // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+        ranges = Arrays.asList(range);
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // use different repairedAt to ensure it doesn't change
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+        // but leave the unrepaired SSTable as is.
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Same repaired at, but should be changed on the repaired SSTable now
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(false));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+        assertThat(store.getTracker().getCompacting().size(), is(0));
+
+        // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+        // mark unrepaired SSTable as repaired
+        range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+        ranges = Arrays.asList(range);
+
+        try (Refs<SSTableReader> refs = Refs.ref(sstables))
+        {
+            // Both SSTables should have repairedAt of 400
+            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
+            fut.get();
+        }
+
+        sstablesSorted.clear();
+        sstablesSorted.addAll(store.getSSTables());
+
+        assertThat(sstablesSorted.size(), is(2));
+        assertThat(sstablesSorted.first().isRepaired(), is(true));
+        assertThat(sstablesSorted.last().isRepaired(), is(true));
+        assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+        assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+        assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
         assertThat(store.getTracker().getCompacting().size(), is(0));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[06/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by mc...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489c2f69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489c2f69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489c2f69

Branch: refs/heads/cassandra-3.0
Commit: 489c2f69510b001770d9a59e55ba5d5175019050
Parents: 4e23c9e f8912ce
Author: Mick Semb Wever <mc...@apache.org>
Authored: Fri Jun 29 16:53:36 2018 +1000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:57:34 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  66 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 109 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index aeeb0ae,9d6a9ea..d694f3b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,5 +1,35 @@@
 -2.2.13
 +3.0.17
 + * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
 + * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
 + * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
 + * Add Missing dependencies in pom-all (CASSANDRA-14422)
 + * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
 + * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
 + * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
 + * Fix progress stats and units in compactionstats (CASSANDRA-12244)
 + * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)
 + * Delay hints store excise by write timeout to avoid race with decommission (CASSANDRA-13740)
 + * Deprecate background repair and probablistic read_repair_chance table options
 +   (CASSANDRA-13910)
 + * Add missed CQL keywords to documentation (CASSANDRA-14359)
 + * Fix unbounded validation compactions on repair / revert CASSANDRA-13797 (CASSANDRA-14332)
 + * Avoid deadlock when running nodetool refresh before node is fully up (CASSANDRA-14310)
 + * Handle all exceptions when opening sstables (CASSANDRA-14202)
 + * Handle incompletely written hint descriptors during startup (CASSANDRA-14080)
 + * Handle repeat open bound from SRP in read repair (CASSANDRA-14330)
 + * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252)
 + * Respect max hint window when hinting for LWT (CASSANDRA-14215)
 + * Adding missing WriteType enum values to v3, v4, and v5 spec (CASSANDRA-13697)
 + * Don't regenerate bloomfilter and summaries on startup (CASSANDRA-11163)
 + * Fix NPE when performing comparison against a null frozen in LWT (CASSANDRA-14087)
 + * Log when SSTables are deleted (CASSANDRA-14302)
 + * Fix batch commitlog sync regression (CASSANDRA-14292)
 + * Write to pending endpoint when view replica is also base replica (CASSANDRA-14251)
 + * Chain commit log marker potential performance regression in batch commit mode (CASSANDRA-14194)
 + * Fully utilise specified compaction threads (CASSANDRA-14210)
 + * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763)
 +Merged from 2.2:
+  * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
   * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
   * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
   * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ab363e0,013fc04..f033bf2
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -474,6 -460,16 +474,17 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
+     /**
+      * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+      * as repaired.
+      *
+      * @param cfs Column family for anti-compaction
+      * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+      * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+      * @param repairedAt Unix timestamp of when repair was completed.
++     * @param parentRepairSession Corresponding repair session
+      * @return Futures executing anti-compaction.
+      */
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                            final Collection<Range<Token>> ranges,
                                            final Refs<SSTableReader> sstables,
@@@ -522,7 -526,7 +542,8 @@@
       * @param cfs
       * @param ranges Ranges that the repair was carried out on
       * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+      * @param txn Transaction across all SSTables that were repaired.
 +     * @param parentRepairSession parent repair session ID
       * @throws InterruptedException
       * @throws IOException
       */
@@@ -530,19 -534,12 +551,13 @@@
                                        Collection<Range<Token>> ranges,
                                        Refs<SSTableReader> validatedForRepair,
                                        LifecycleTransaction txn,
 -                                      long repairedAt) throws InterruptedException, IOException
 +                                      long repairedAt,
 +                                      UUID parentRepairSession) throws InterruptedException, IOException
      {
 -        logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
 -        logger.trace("Starting anticompaction for ranges {}", ranges);
 +        logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables());
 +        logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges);
          Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-         // we should only notify that repair status changed if it actually did:
-         Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-         Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-         for (SSTableReader sstable : sstables)
-             wasRepairedBefore.put(sstable, sstable.isRepaired());
- 
+         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
          Set<SSTableReader> nonAnticompacting = new HashSet<>();
  
          Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@@ -562,12 -564,11 +582,11 @@@
                  {
                      if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right))
                      {
 -                        logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
 +                        logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r);
                          sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                          sstable.reloadSSTableMetadata();
-                         mutatedRepairStatuses.add(sstable);
-                         if (!wasRepairedBefore.get(sstable))
-                             mutatedRepairStatusToNotify.add(sstable);
+                         if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                             mutatedRepairStatuses.add(sstable);
                          sstableIterator.remove();
                          shouldAnticompact = true;
                          break;
@@@ -579,9 -580,12 +598,12 @@@
                      }
                  }
  
+                 if (!anticompactRanges.isEmpty())
 -                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
++                    logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableBounds, String.join(", ", anticompactRanges));
+ 
                  if (!shouldAnticompact)
                  {
-                     logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
 -                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
++                    logger.info("[repair #{}] SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
                      nonAnticompacting.add(sstable);
                      sstableIterator.remove();
                  }
@@@ -1245,19 -1244,11 +1267,11 @@@
       */
      private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
      {
-         logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+         int numAnticompact = repaired.originals().size();
+         logger.info("Performing anticompaction on {} sstables", numAnticompact);
  
          //Group SSTables
-         Set<SSTableReader> sstables = repaired.originals();
- 
-         // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-         // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-         // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-         // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-         Set<SSTableReader> unrepairedSSTables = sstables.stream().filter((s) -> !s.isRepaired()).collect(Collectors.toSet());
- 
-         Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables);
- 
 -        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
++        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals());
          // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
          int antiCompactedSSTableCount = 0;
          for (Collection<SSTableReader> sstableGroup : groupedSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index ead0349,abd9a4a..8991f88
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -21,13 -29,20 +21,17 @@@ import java.io.File
  import java.io.IOException;
  import java.util.Arrays;
  import java.util.Collection;
 -import java.util.Comparator;
  import java.util.List;
 +import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TreeSet;
 +import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
  
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -
 +import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.ListenableFuture;
 +import com.google.common.util.concurrent.RateLimiter;
  import org.junit.BeforeClass;
  import org.junit.After;
  import org.junit.Test;
@@@ -270,12 -273,119 +274,112 @@@ public class AntiCompactionTes
          try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
               Refs<SSTableReader> refs = Refs.ref(sstables))
          {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
 +            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
          }
  
 -        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
 -        assertThat(store.getSSTables().size(), is(1));
++        SSTableReader sstable = Iterables.get(store.getLiveSSTables(), 0);
 +        assertThat(store.getLiveSSTables().size(), is(1));
-         assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true));
-         assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
+         assertThat(sstable.isRepaired(), is(true));
+         assertThat(sstable.selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+     }
+ 
+     @Test
+     public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 -        assertEquals(store.getSSTables().size(), sstables.size());
++        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
++        assertEquals(store.getLiveSSTables().size(), sstables.size());
+         // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+         // one repaired and one unrepaired SSTable
+         Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+         List<Range<Token>> ranges = Arrays.asList(range);
++        UUID parentRepairSession = UUID.randomUUID();
+ 
+         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+              Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
++            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+         }
+ 
 -        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
 -        {
 -            public int compare(SSTableReader o1, SSTableReader o2)
 -            {
 -                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
 -            }
 -        };
 -
 -        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
 -        sstablesSorted.addAll(store.getSSTables());
++        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(SSTableReader.generationReverseComparator.reversed());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         SSTableReader sstable = sstablesSorted.first();
 -        assertThat(store.getSSTables().size(), is(2));
++        assertThat(store.getLiveSSTables().size(), is(2));
+         assertThat(sstable.isRepaired(), is(true));
+         assertThat(sstable.selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
 -        sstables = store.getSSTables();
++        sstables = store.getLiveSSTables();
+         // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+         ranges = Arrays.asList(range);
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // use different repairedAt to ensure it doesn't change
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(false));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+         // but leave the unrepaired SSTable as is.
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+         ranges = Arrays.asList(range);
+ 
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // Same repaired at, but should be changed on the repaired SSTable now
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(false));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+         // mark unrepaired SSTable as repaired
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+         ranges = Arrays.asList(range);
+ 
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // Both SSTables should have repairedAt of 400
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(true));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
          assertThat(store.getTracker().getCompacting().size(), is(0));
      }
  
@@@ -332,11 -446,4 +436,10 @@@
          ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
          store.truncateBlocking();
      }
 +
 +    private static Set<SSTableReader> getUnrepairedSSTables(ColumnFamilyStore cfs)
 +    {
 +        return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
 +    }
 +
- 
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[08/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by mc...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bba0d03e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bba0d03e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bba0d03e

Branch: refs/heads/cassandra-3.11
Commit: bba0d03e9c5e62c222734839a9adc83f1aec6f95
Parents: ea62d88 489c2f6
Author: Mick Semb Wever <mc...@apache.org>
Authored: Fri Jun 29 16:58:26 2018 +1000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 17:00:02 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  67 +++++++-----
 .../db/compaction/AntiCompactionTest.java       | 109 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bba0d03e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f0a4de5,f033bf2..fa6b03e
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -478,115 -474,17 +477,126 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
 +    public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
 +    {
 +        assert !cfStore.isIndex();
 +
 +        return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Iterable<SSTableReader> originals = transaction.originals();
 +                if (cfStore.getCompactionStrategyManager().onlyPurgeRepairedTombstones())
 +                    originals = Iterables.filter(originals, SSTableReader::isRepaired);
 +                List<SSTableReader> sortedSSTables = Lists.newArrayList(originals);
 +                Collections.sort(sortedSSTables, SSTableReader.maxTimestampComparator);
 +                return sortedSSTables;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn) throws IOException
 +            {
 +                logger.debug("Garbage collecting {}", txn.originals());
 +                CompactionTask task = new CompactionTask(cfStore, txn, getDefaultGcBefore(cfStore, FBUtilities.nowInSeconds()))
 +                {
 +                    @Override
 +                    protected CompactionController getCompactionController(Set<SSTableReader> toCompact)
 +                    {
 +                        return new CompactionController(cfStore, toCompact, gcBefore, null, tombstoneOption);
 +                    }
 +                };
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.GARBAGE_COLLECT);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.GARBAGE_COLLECT);
 +    }
 +
 +    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, int jobs) throws ExecutionException, InterruptedException
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
 +        {
 +            logger.info("Partitioner does not support splitting");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +        final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 +
 +        if (r.isEmpty())
 +        {
 +            logger.info("Relocate cannot run before a node has joined the ring");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +
 +        final DiskBoundaries diskBoundaries = cfs.getDiskBoundaries();
 +
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
 +                Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
 +                transaction.cancel(Sets.difference(originals, needsRelocation));
 +
 +                Map<Integer, List<SSTableReader>> groupedByDisk = groupByDiskIndex(needsRelocation);
 +
 +                int maxSize = 0;
 +                for (List<SSTableReader> diskSSTables : groupedByDisk.values())
 +                    maxSize = Math.max(maxSize, diskSSTables.size());
 +
 +                List<SSTableReader> mixedSSTables = new ArrayList<>();
 +
 +                for (int i = 0; i < maxSize; i++)
 +                    for (List<SSTableReader> diskSSTables : groupedByDisk.values())
 +                        if (i < diskSSTables.size())
 +                            mixedSSTables.add(diskSSTables.get(i));
 +
 +                return mixedSSTables;
 +            }
 +
 +            public Map<Integer, List<SSTableReader>> groupByDiskIndex(Set<SSTableReader> needsRelocation)
 +            {
 +                return needsRelocation.stream().collect(Collectors.groupingBy((s) -> diskBoundaries.getDiskIndex(s)));
 +            }
 +
 +            private boolean inCorrectLocation(SSTableReader sstable)
 +            {
 +                if (!cfs.getPartitioner().splitter().isPresent())
 +                    return true;
 +
 +                int diskIndex = diskBoundaries.getDiskIndex(sstable);
 +                File diskLocation = diskBoundaries.directories.get(diskIndex).location;
 +                PartitionPosition diskLast = diskBoundaries.positions.get(diskIndex);
 +
 +                // the location we get from directoryIndex is based on the first key in the sstable
 +                // now we need to make sure the last key is less than the boundary as well:
 +                return sstable.descriptor.directory.getAbsolutePath().startsWith(diskLocation.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn)
 +            {
 +                logger.debug("Relocating {}", txn.originals());
 +                AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.RELOCATE);
 +                task.execute(metrics);
 +            }
 +        }, jobs, OperationType.RELOCATE);
 +    }
 +
+     /**
+      * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+      * as repaired.
+      *
+      * @param cfs Column family for anti-compaction
+      * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+      * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+      * @param repairedAt Unix timestamp of when repair was completed.
+      * @param parentRepairSession Corresponding repair session
+      * @return Futures executing anti-compaction.
+      */
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                            final Collection<Range<Token>> ranges,
                                            final Refs<SSTableReader> sstables,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by mc...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5cc68a87
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5cc68a87
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5cc68a87

Branch: refs/heads/trunk
Commit: 5cc68a87359dd02412bdb70a52dfcd718d44a5ba
Parents: 4cb83cb bba0d03
Author: Mick Semb Wever <mc...@apache.org>
Authored: Fri Jun 29 17:00:24 2018 +1000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 17:00:24 2018 +1000

----------------------------------------------------------------------

----------------------------------------------------------------------



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[05/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by mc...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489c2f69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489c2f69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489c2f69

Branch: refs/heads/cassandra-3.11
Commit: 489c2f69510b001770d9a59e55ba5d5175019050
Parents: 4e23c9e f8912ce
Author: Mick Semb Wever <mc...@apache.org>
Authored: Fri Jun 29 16:53:36 2018 +1000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:57:34 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  66 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 109 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index aeeb0ae,9d6a9ea..d694f3b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,5 +1,35 @@@
 -2.2.13
 +3.0.17
 + * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
 + * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
 + * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
 + * Add Missing dependencies in pom-all (CASSANDRA-14422)
 + * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
 + * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
 + * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
 + * Fix progress stats and units in compactionstats (CASSANDRA-12244)
 + * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)
 + * Delay hints store excise by write timeout to avoid race with decommission (CASSANDRA-13740)
 + * Deprecate background repair and probablistic read_repair_chance table options
 +   (CASSANDRA-13910)
 + * Add missed CQL keywords to documentation (CASSANDRA-14359)
 + * Fix unbounded validation compactions on repair / revert CASSANDRA-13797 (CASSANDRA-14332)
 + * Avoid deadlock when running nodetool refresh before node is fully up (CASSANDRA-14310)
 + * Handle all exceptions when opening sstables (CASSANDRA-14202)
 + * Handle incompletely written hint descriptors during startup (CASSANDRA-14080)
 + * Handle repeat open bound from SRP in read repair (CASSANDRA-14330)
 + * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252)
 + * Respect max hint window when hinting for LWT (CASSANDRA-14215)
 + * Adding missing WriteType enum values to v3, v4, and v5 spec (CASSANDRA-13697)
 + * Don't regenerate bloomfilter and summaries on startup (CASSANDRA-11163)
 + * Fix NPE when performing comparison against a null frozen in LWT (CASSANDRA-14087)
 + * Log when SSTables are deleted (CASSANDRA-14302)
 + * Fix batch commitlog sync regression (CASSANDRA-14292)
 + * Write to pending endpoint when view replica is also base replica (CASSANDRA-14251)
 + * Chain commit log marker potential performance regression in batch commit mode (CASSANDRA-14194)
 + * Fully utilise specified compaction threads (CASSANDRA-14210)
 + * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763)
 +Merged from 2.2:
+  * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
   * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
   * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
   * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ab363e0,013fc04..f033bf2
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -474,6 -460,16 +474,17 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
+     /**
+      * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+      * as repaired.
+      *
+      * @param cfs Column family for anti-compaction
+      * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+      * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+      * @param repairedAt Unix timestamp of when repair was completed.
++     * @param parentRepairSession Corresponding repair session
+      * @return Futures executing anti-compaction.
+      */
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                            final Collection<Range<Token>> ranges,
                                            final Refs<SSTableReader> sstables,
@@@ -522,7 -526,7 +542,8 @@@
       * @param cfs
       * @param ranges Ranges that the repair was carried out on
       * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+      * @param txn Transaction across all SSTables that were repaired.
 +     * @param parentRepairSession parent repair session ID
       * @throws InterruptedException
       * @throws IOException
       */
@@@ -530,19 -534,12 +551,13 @@@
                                        Collection<Range<Token>> ranges,
                                        Refs<SSTableReader> validatedForRepair,
                                        LifecycleTransaction txn,
 -                                      long repairedAt) throws InterruptedException, IOException
 +                                      long repairedAt,
 +                                      UUID parentRepairSession) throws InterruptedException, IOException
      {
 -        logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
 -        logger.trace("Starting anticompaction for ranges {}", ranges);
 +        logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables());
 +        logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges);
          Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-         // we should only notify that repair status changed if it actually did:
-         Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-         Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-         for (SSTableReader sstable : sstables)
-             wasRepairedBefore.put(sstable, sstable.isRepaired());
- 
+         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
          Set<SSTableReader> nonAnticompacting = new HashSet<>();
  
          Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@@ -562,12 -564,11 +582,11 @@@
                  {
                      if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right))
                      {
 -                        logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
 +                        logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r);
                          sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                          sstable.reloadSSTableMetadata();
-                         mutatedRepairStatuses.add(sstable);
-                         if (!wasRepairedBefore.get(sstable))
-                             mutatedRepairStatusToNotify.add(sstable);
+                         if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                             mutatedRepairStatuses.add(sstable);
                          sstableIterator.remove();
                          shouldAnticompact = true;
                          break;
@@@ -579,9 -580,12 +598,12 @@@
                      }
                  }
  
+                 if (!anticompactRanges.isEmpty())
 -                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
++                    logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableBounds, String.join(", ", anticompactRanges));
+ 
                  if (!shouldAnticompact)
                  {
-                     logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
 -                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
++                    logger.info("[repair #{}] SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
                      nonAnticompacting.add(sstable);
                      sstableIterator.remove();
                  }
@@@ -1245,19 -1244,11 +1267,11 @@@
       */
      private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
      {
-         logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+         int numAnticompact = repaired.originals().size();
+         logger.info("Performing anticompaction on {} sstables", numAnticompact);
  
          //Group SSTables
-         Set<SSTableReader> sstables = repaired.originals();
- 
-         // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-         // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-         // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-         // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-         Set<SSTableReader> unrepairedSSTables = sstables.stream().filter((s) -> !s.isRepaired()).collect(Collectors.toSet());
- 
-         Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables);
- 
 -        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
++        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals());
          // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
          int antiCompactedSSTableCount = 0;
          for (Collection<SSTableReader> sstableGroup : groupedSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index ead0349,abd9a4a..8991f88
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -21,13 -29,20 +21,17 @@@ import java.io.File
  import java.io.IOException;
  import java.util.Arrays;
  import java.util.Collection;
 -import java.util.Comparator;
  import java.util.List;
 +import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TreeSet;
 +import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
  
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -
 +import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.ListenableFuture;
 +import com.google.common.util.concurrent.RateLimiter;
  import org.junit.BeforeClass;
  import org.junit.After;
  import org.junit.Test;
@@@ -270,12 -273,119 +274,112 @@@ public class AntiCompactionTes
          try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
               Refs<SSTableReader> refs = Refs.ref(sstables))
          {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
 +            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
          }
  
 -        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
 -        assertThat(store.getSSTables().size(), is(1));
++        SSTableReader sstable = Iterables.get(store.getLiveSSTables(), 0);
 +        assertThat(store.getLiveSSTables().size(), is(1));
-         assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true));
-         assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
+         assertThat(sstable.isRepaired(), is(true));
+         assertThat(sstable.selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+     }
+ 
+     @Test
+     public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 -        assertEquals(store.getSSTables().size(), sstables.size());
++        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
++        assertEquals(store.getLiveSSTables().size(), sstables.size());
+         // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+         // one repaired and one unrepaired SSTable
+         Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+         List<Range<Token>> ranges = Arrays.asList(range);
++        UUID parentRepairSession = UUID.randomUUID();
+ 
+         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+              Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
++            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+         }
+ 
 -        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
 -        {
 -            public int compare(SSTableReader o1, SSTableReader o2)
 -            {
 -                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
 -            }
 -        };
 -
 -        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
 -        sstablesSorted.addAll(store.getSSTables());
++        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(SSTableReader.generationReverseComparator.reversed());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         SSTableReader sstable = sstablesSorted.first();
 -        assertThat(store.getSSTables().size(), is(2));
++        assertThat(store.getLiveSSTables().size(), is(2));
+         assertThat(sstable.isRepaired(), is(true));
+         assertThat(sstable.selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
 -        sstables = store.getSSTables();
++        sstables = store.getLiveSSTables();
+         // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+         ranges = Arrays.asList(range);
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // use different repairedAt to ensure it doesn't change
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(false));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+         // but leave the unrepaired SSTable as is.
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+         ranges = Arrays.asList(range);
+ 
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // Same repaired at, but should be changed on the repaired SSTable now
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(false));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+         // mark unrepaired SSTable as repaired
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+         ranges = Arrays.asList(range);
+ 
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // Both SSTables should have repairedAt of 400
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(true));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
          assertThat(store.getTracker().getCompacting().size(), is(0));
      }
  
@@@ -332,11 -446,4 +436,10 @@@
          ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
          store.truncateBlocking();
      }
 +
 +    private static Set<SSTableReader> getUnrepairedSSTables(ColumnFamilyStore cfs)
 +    {
 +        return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
 +    }
 +
- 
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[07/10] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by mc...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/489c2f69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/489c2f69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/489c2f69

Branch: refs/heads/trunk
Commit: 489c2f69510b001770d9a59e55ba5d5175019050
Parents: 4e23c9e f8912ce
Author: Mick Semb Wever <mc...@apache.org>
Authored: Fri Jun 29 16:53:36 2018 +1000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:57:34 2018 +1000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  66 ++++++-----
 .../db/compaction/AntiCompactionTest.java       | 109 ++++++++++++++++++-
 3 files changed, 147 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index aeeb0ae,9d6a9ea..d694f3b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,5 +1,35 @@@
 -2.2.13
 +3.0.17
 + * Always close RT markers returned by ReadCommand#executeLocally() (CASSANDRA-14515)
 + * Reverse order queries with range tombstones can cause data loss (CASSANDRA-14513)
 + * Fix regression of lagging commitlog flush log message (CASSANDRA-14451)
 + * Add Missing dependencies in pom-all (CASSANDRA-14422)
 + * Cleanup StartupClusterConnectivityChecker and PING Verb (CASSANDRA-14447)
 + * Fix deprecated repair error notifications from 3.x clusters to legacy JMX clients (CASSANDRA-13121)
 + * Cassandra not starting when using enhanced startup scripts in windows (CASSANDRA-14418)
 + * Fix progress stats and units in compactionstats (CASSANDRA-12244)
 + * Better handle missing partition columns in system_schema.columns (CASSANDRA-14379)
 + * Delay hints store excise by write timeout to avoid race with decommission (CASSANDRA-13740)
 + * Deprecate background repair and probablistic read_repair_chance table options
 +   (CASSANDRA-13910)
 + * Add missed CQL keywords to documentation (CASSANDRA-14359)
 + * Fix unbounded validation compactions on repair / revert CASSANDRA-13797 (CASSANDRA-14332)
 + * Avoid deadlock when running nodetool refresh before node is fully up (CASSANDRA-14310)
 + * Handle all exceptions when opening sstables (CASSANDRA-14202)
 + * Handle incompletely written hint descriptors during startup (CASSANDRA-14080)
 + * Handle repeat open bound from SRP in read repair (CASSANDRA-14330)
 + * Use zero as default score in DynamicEndpointSnitch (CASSANDRA-14252)
 + * Respect max hint window when hinting for LWT (CASSANDRA-14215)
 + * Adding missing WriteType enum values to v3, v4, and v5 spec (CASSANDRA-13697)
 + * Don't regenerate bloomfilter and summaries on startup (CASSANDRA-11163)
 + * Fix NPE when performing comparison against a null frozen in LWT (CASSANDRA-14087)
 + * Log when SSTables are deleted (CASSANDRA-14302)
 + * Fix batch commitlog sync regression (CASSANDRA-14292)
 + * Write to pending endpoint when view replica is also base replica (CASSANDRA-14251)
 + * Chain commit log marker potential performance regression in batch commit mode (CASSANDRA-14194)
 + * Fully utilise specified compaction threads (CASSANDRA-14210)
 + * Pre-create deletion log records to finish compactions quicker (CASSANDRA-12763)
 +Merged from 2.2:
+  * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
   * Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
   * Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
   * Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ab363e0,013fc04..f033bf2
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -474,6 -460,16 +474,17 @@@ public class CompactionManager implemen
          }, jobs, OperationType.CLEANUP);
      }
  
+     /**
+      * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+      * as repaired.
+      *
+      * @param cfs Column family for anti-compaction
+      * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+      * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+      * @param repairedAt Unix timestamp of when repair was completed.
++     * @param parentRepairSession Corresponding repair session
+      * @return Futures executing anti-compaction.
+      */
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                            final Collection<Range<Token>> ranges,
                                            final Refs<SSTableReader> sstables,
@@@ -522,7 -526,7 +542,8 @@@
       * @param cfs
       * @param ranges Ranges that the repair was carried out on
       * @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+      * @param txn Transaction across all SSTables that were repaired.
 +     * @param parentRepairSession parent repair session ID
       * @throws InterruptedException
       * @throws IOException
       */
@@@ -530,19 -534,12 +551,13 @@@
                                        Collection<Range<Token>> ranges,
                                        Refs<SSTableReader> validatedForRepair,
                                        LifecycleTransaction txn,
 -                                      long repairedAt) throws InterruptedException, IOException
 +                                      long repairedAt,
 +                                      UUID parentRepairSession) throws InterruptedException, IOException
      {
 -        logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
 -        logger.trace("Starting anticompaction for ranges {}", ranges);
 +        logger.info("[repair #{}] Starting anticompaction for {}.{} on {}/{} sstables", parentRepairSession, cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables());
 +        logger.trace("[repair #{}] Starting anticompaction for ranges {}", parentRepairSession, ranges);
          Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-         // we should only notify that repair status changed if it actually did:
-         Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-         Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-         for (SSTableReader sstable : sstables)
-             wasRepairedBefore.put(sstable, sstable.isRepaired());
- 
+         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
          Set<SSTableReader> nonAnticompacting = new HashSet<>();
  
          Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@@ -562,12 -564,11 +582,11 @@@
                  {
                      if (r.contains(sstableBounds.left) && r.contains(sstableBounds.right))
                      {
 -                        logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
 +                        logger.info("[repair #{}] SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", parentRepairSession, sstable, r);
                          sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
                          sstable.reloadSSTableMetadata();
-                         mutatedRepairStatuses.add(sstable);
-                         if (!wasRepairedBefore.get(sstable))
-                             mutatedRepairStatusToNotify.add(sstable);
+                         if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+                             mutatedRepairStatuses.add(sstable);
                          sstableIterator.remove();
                          shouldAnticompact = true;
                          break;
@@@ -579,9 -580,12 +598,12 @@@
                      }
                  }
  
+                 if (!anticompactRanges.isEmpty())
 -                    logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
++                    logger.info("[repair #{}] SSTable {} ({}) will be anticompacted on range {}", parentRepairSession, sstable, sstableBounds, String.join(", ", anticompactRanges));
+ 
                  if (!shouldAnticompact)
                  {
-                     logger.info("[repair #{}] SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
 -                    logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
++                    logger.info("[repair #{}] SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", parentRepairSession, sstable, sstableBounds, normalizedRanges);
                      nonAnticompacting.add(sstable);
                      sstableIterator.remove();
                  }
@@@ -1245,19 -1244,11 +1267,11 @@@
       */
      private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
      {
-         logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+         int numAnticompact = repaired.originals().size();
+         logger.info("Performing anticompaction on {} sstables", numAnticompact);
  
          //Group SSTables
-         Set<SSTableReader> sstables = repaired.originals();
- 
-         // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
-         // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
-         // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
-         // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
-         Set<SSTableReader> unrepairedSSTables = sstables.stream().filter((s) -> !s.isRepaired()).collect(Collectors.toSet());
- 
-         Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(unrepairedSSTables);
- 
 -        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
++        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategyManager().groupSSTablesForAntiCompaction(repaired.originals());
          // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
          int antiCompactedSSTableCount = 0;
          for (Collection<SSTableReader> sstableGroup : groupedSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/489c2f69/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index ead0349,abd9a4a..8991f88
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@@ -21,13 -29,20 +21,17 @@@ import java.io.File
  import java.io.IOException;
  import java.util.Arrays;
  import java.util.Collection;
 -import java.util.Comparator;
  import java.util.List;
 +import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TreeSet;
 +import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
  
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 -import org.apache.cassandra.locator.SimpleStrategy;
 -
 +import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
+ import com.google.common.util.concurrent.ListenableFuture;
 +import com.google.common.util.concurrent.RateLimiter;
  import org.junit.BeforeClass;
  import org.junit.After;
  import org.junit.Test;
@@@ -270,12 -273,119 +274,112 @@@ public class AntiCompactionTes
          try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
               Refs<SSTableReader> refs = Refs.ref(sstables))
          {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
 +            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
          }
  
 -        SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
 -        assertThat(store.getSSTables().size(), is(1));
++        SSTableReader sstable = Iterables.get(store.getLiveSSTables(), 0);
 +        assertThat(store.getLiveSSTables().size(), is(1));
-         assertThat(Iterables.get(store.getLiveSSTables(), 0).isRepaired(), is(true));
-         assertThat(Iterables.get(store.getLiveSSTables(), 0).selfRef().globalCount(), is(1));
+         assertThat(sstable.isRepaired(), is(true));
+         assertThat(sstable.selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+     }
+ 
+     @Test
+     public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+     {
+         ColumnFamilyStore store = prepareColumnFamilyStore();
 -        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
 -        assertEquals(store.getSSTables().size(), sstables.size());
++        Collection<SSTableReader> sstables = getUnrepairedSSTables(store);
++        assertEquals(store.getLiveSSTables().size(), sstables.size());
+         // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+         // one repaired and one unrepaired SSTable
+         Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+         List<Range<Token>> ranges = Arrays.asList(range);
++        UUID parentRepairSession = UUID.randomUUID();
+ 
+         try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+              Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
 -            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
++            CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1, parentRepairSession);
+         }
+ 
 -        Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
 -        {
 -            public int compare(SSTableReader o1, SSTableReader o2)
 -            {
 -                return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
 -            }
 -        };
 -
 -        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
 -        sstablesSorted.addAll(store.getSSTables());
++        SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(SSTableReader.generationReverseComparator.reversed());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         SSTableReader sstable = sstablesSorted.first();
 -        assertThat(store.getSSTables().size(), is(2));
++        assertThat(store.getLiveSSTables().size(), is(2));
+         assertThat(sstable.isRepaired(), is(true));
+         assertThat(sstable.selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
 -        sstables = store.getSSTables();
++        sstables = store.getLiveSSTables();
+         // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+         ranges = Arrays.asList(range);
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // use different repairedAt to ensure it doesn't change
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(false));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+         // but leave the unrepaired SSTable as is.
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+         ranges = Arrays.asList(range);
+ 
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // Same repaired at, but should be changed on the repaired SSTable now
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(false));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+         assertThat(store.getTracker().getCompacting().size(), is(0));
+ 
+         // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+         // mark unrepaired SSTable as repaired
+         range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+         ranges = Arrays.asList(range);
+ 
+         try (Refs<SSTableReader> refs = Refs.ref(sstables))
+         {
+             // Both SSTables should have repairedAt of 400
 -            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
++            ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400, parentRepairSession);
+             fut.get();
+         }
+ 
+         sstablesSorted.clear();
 -        sstablesSorted.addAll(store.getSSTables());
++        sstablesSorted.addAll(store.getLiveSSTables());
+ 
+         assertThat(sstablesSorted.size(), is(2));
+         assertThat(sstablesSorted.first().isRepaired(), is(true));
+         assertThat(sstablesSorted.last().isRepaired(), is(true));
+         assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+         assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+         assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+         assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
          assertThat(store.getTracker().getCompacting().size(), is(0));
      }
  
@@@ -332,11 -446,4 +436,10 @@@
          ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
          store.truncateBlocking();
      }
 +
 +    private static Set<SSTableReader> getUnrepairedSSTables(ColumnFamilyStore cfs)
 +    {
 +        return ImmutableSet.copyOf(cfs.getTracker().getView().sstables(SSTableSet.LIVE, (s) -> !s.isRepaired()));
 +    }
 +
- 
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org