You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2018/06/29 07:57:49 UTC
[04/10] cassandra git commit: Stop SSTables being lost from
compaction strategy after full repairs
Stop SSTables being lost from compaction strategy after full repairs
patch by Kurt Greaves; reviewed by Stefan Podkowinski, Marcus Eriksson, for CASSANDRA-14423
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8912ce9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8912ce9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8912ce9
Branch: refs/heads/trunk
Commit: f8912ce9329a8bc360e93cf61e56814135fbab39
Parents: 1143bc1
Author: kurt <ku...@instaclustr.com>
Authored: Thu Jun 14 10:59:19 2018 +0000
Committer: Mick Semb Wever <mc...@apache.org>
Committed: Fri Jun 29 16:49:53 2018 +1000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 70 ++++++-----
.../db/compaction/AntiCompactionTest.java | 120 ++++++++++++++++++-
3 files changed, 156 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b1089e..9d6a9ea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.13
+ * Fix bug that prevented compaction of SSTables after full repairs (CASSANDRA-14423)
* Incorrect counting of pending messages in OutboundTcpConnection (CASSANDRA-11551)
* Fix compaction failure caused by reading un-flushed data (CASSANDRA-12743)
* Use Bounds instead of Range for sstables in anticompaction (CASSANDRA-14411)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 419f66e..013fc04 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -460,6 +460,16 @@ public class CompactionManager implements CompactionManagerMBean
}, jobs, OperationType.CLEANUP);
}
+ /**
+ * Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
+ * as repaired.
+ *
+ * @param cfs Column family for anti-compaction
+ * @param ranges Repaired ranges to be anti-compacted into separate SSTables.
+ * @param sstables {@link Refs} of SSTables within CF to anti-compact.
+ * @param repairedAt Unix timestamp of when repair was completed.
+ * @return Futures executing anti-compaction.
+ */
public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
final Collection<Range<Token>> ranges,
final Refs<SSTableReader> sstables,
@@ -475,6 +485,8 @@ public class CompactionManager implements CompactionManagerMBean
{
for (SSTableReader compactingSSTable : cfs.getTracker().getCompacting())
sstables.releaseIfHolds(compactingSSTable);
+ // We don't anti-compact any SSTable that has been compacted during repair as it may have been compacted
+ // with unrepaired data.
Set<SSTableReader> compactedSSTables = new HashSet<>();
for (SSTableReader sstable : sstables)
if (sstable.isMarkedCompacted())
@@ -504,9 +516,17 @@ public class CompactionManager implements CompactionManagerMBean
*
* Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
*
+ * NOTE: Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
+ * Although anti-compaction could work on repaired sstables as well and would result in having more accurate
+ * repairedAt values for these, we avoid anti-compacting already repaired sstables, as we currently don't
+ * make use of any actual repairedAt value and splitting up sstables just for that is not worth it. However, we will
+ * still update repairedAt if the SSTable is fully contained within the repaired ranges, as this does not require
+ * anticompaction.
+ *
* @param cfs
* @param ranges Ranges that the repair was carried out on
* @param validatedForRepair SSTables containing the repaired ranges. Should be referenced before passing them.
+ * @param txn Transaction across all SSTables that were repaired.
* @throws InterruptedException
* @throws IOException
*/
@@ -519,13 +539,7 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
logger.trace("Starting anticompaction for ranges {}", ranges);
Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
- Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
- // we should only notify that repair status changed if it actually did:
- Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
- Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
- for (SSTableReader sstable : sstables)
- wasRepairedBefore.put(sstable, sstable.isRepaired());
-
+ Set<SSTableReader> mutatedRepairStatuses = new HashSet<>(); // SSTables that were completely repaired only
Set<SSTableReader> nonAnticompacting = new HashSet<>();
Iterator<SSTableReader> sstableIterator = sstables.iterator();
@@ -536,6 +550,11 @@ public class CompactionManager implements CompactionManagerMBean
while (sstableIterator.hasNext())
{
SSTableReader sstable = sstableIterator.next();
+ List<String> anticompactRanges = new ArrayList<>();
+ // We don't anti-compact SSTables already marked repaired. See CASSANDRA-13153
+ // and CASSANDRA-14423.
+ if (sstable.isRepaired()) // We never anti-compact already repaired SSTables
+ nonAnticompacting.add(sstable);
Bounds<Token> sstableBounds = new Bounds<>(sstable.first.getToken(), sstable.last.getToken());
@@ -548,28 +567,30 @@ public class CompactionManager implements CompactionManagerMBean
logger.info("SSTable {} fully contained in range {}, mutating repairedAt instead of anticompacting", sstable, r);
sstable.descriptor.getMetadataSerializer().mutateRepairedAt(sstable.descriptor, repairedAt);
sstable.reloadSSTableMetadata();
- mutatedRepairStatuses.add(sstable);
- if (!wasRepairedBefore.get(sstable))
- mutatedRepairStatusToNotify.add(sstable);
+ if (!nonAnticompacting.contains(sstable)) // don't notify if the SSTable was already repaired
+ mutatedRepairStatuses.add(sstable);
sstableIterator.remove();
shouldAnticompact = true;
break;
}
- else if (r.intersects(sstableBounds))
+ else if (r.intersects(sstableBounds) && !nonAnticompacting.contains(sstable))
{
- logger.info("SSTable {} ({}) will be anticompacted on range {}", sstable, sstableBounds, r);
+ anticompactRanges.add(r.toString());
shouldAnticompact = true;
}
}
+ if (!anticompactRanges.isEmpty())
+ logger.info("SSTable {} ({}) will be anticompacted on ranges: {}", sstable, sstableBounds, String.join(", ", anticompactRanges));
+
if (!shouldAnticompact)
{
- logger.info("SSTable {} ({}) does not intersect repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
+ logger.info("SSTable {} ({}) not subject to anticompaction of repaired ranges {}, not touching repairedAt.", sstable, sstableBounds, normalizedRanges);
nonAnticompacting.add(sstable);
sstableIterator.remove();
}
}
- cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
+ cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatuses);
txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
assert txn.originals().equals(sstables);
@@ -1223,24 +1244,11 @@ public class CompactionManager implements CompactionManagerMBean
*/
private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, LifecycleTransaction repaired, long repairedAt)
{
- logger.info("Performing anticompaction on {} sstables", repaired.originals().size());
+ int numAnticompact = repaired.originals().size();
+ logger.info("Performing anticompaction on {} sstables", numAnticompact);
//Group SSTables
- Set<SSTableReader> sstables = repaired.originals();
-
- // Repairs can take place on both unrepaired (incremental + full) and repaired (full) data.
- // Although anti-compaction could work on repaired sstables as well and would result in having more accurate
- // repairedAt values for these, we still avoid anti-compacting already repaired sstables, as we currently don't
- // make use of any actual repairedAt value and splitting up sstables just for that is not worth it at this point.
- Set<SSTableReader> unrepairedSSTables = ImmutableSet.copyOf(Iterables.filter(sstables, new Predicate<SSTableReader>()
- {
- public boolean apply(SSTableReader input)
- {
- return !input.isRepaired();
- }
- }));
-
- Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(unrepairedSSTables);
+ Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repaired.originals());
// iterate over sstables to check if the repaired / unrepaired ranges intersect them.
int antiCompactedSSTableCount = 0;
for (Collection<SSTableReader> sstableGroup : groupedSSTables)
@@ -1253,7 +1261,7 @@ public class CompactionManager implements CompactionManagerMBean
}
String format = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
- logger.info(format, repaired.originals().size(), antiCompactedSSTableCount);
+ logger.info(format, numAnticompact, antiCompactedSSTableCount);
}
private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>> ranges,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8912ce9/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index c451516..abd9a4a 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -29,13 +29,20 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.locator.SimpleStrategy;
+
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
import org.junit.BeforeClass;
import org.junit.After;
import org.junit.Test;
@@ -55,8 +62,6 @@ import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
-import com.google.common.collect.Iterables;
-
public class AntiCompactionTest
{
private static final String KEYSPACE1 = "AntiCompactionTest";
@@ -271,9 +276,116 @@ public class AntiCompactionTest
CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
}
+ SSTableReader sstable = Iterables.get(store.getSSTables(), 0);
assertThat(store.getSSTables().size(), is(1));
- assertThat(Iterables.get(store.getSSTables(), 0).isRepaired(), is(true));
- assertThat(Iterables.get(store.getSSTables(), 0).selfRef().globalCount(), is(1));
+ assertThat(sstable.isRepaired(), is(true));
+ assertThat(sstable.selfRef().globalCount(), is(1));
+ assertThat(store.getTracker().getCompacting().size(), is(0));
+ }
+
+ @Test
+ public void shouldAntiCompactSSTable() throws IOException, InterruptedException, ExecutionException
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+ assertEquals(store.getSSTables().size(), sstables.size());
+ // SSTable range is 0 - 10, repair just a subset of the ranges (0 - 4) of the SSTable. Should result in
+ // one repaired and one unrepaired SSTable
+ Range<Token> range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+ List<Range<Token>> ranges = Arrays.asList(range);
+
+ try (LifecycleTransaction txn = store.getTracker().tryModify(sstables, OperationType.ANTICOMPACTION);
+ Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ CompactionManager.instance.performAnticompaction(store, ranges, refs, txn, 1);
+ }
+
+ Comparator<SSTableReader> generationReverseComparator = new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Integer.compare(o1.descriptor.generation, o2.descriptor.generation);
+ }
+ };
+
+ SortedSet<SSTableReader> sstablesSorted = new TreeSet<>(generationReverseComparator);
+ sstablesSorted.addAll(store.getSSTables());
+
+ SSTableReader sstable = sstablesSorted.first();
+ assertThat(store.getSSTables().size(), is(2));
+ assertThat(sstable.isRepaired(), is(true));
+ assertThat(sstable.selfRef().globalCount(), is(1));
+ assertThat(store.getTracker().getCompacting().size(), is(0));
+
+ // Test we don't anti-compact already repaired SSTables. repairedAt shouldn't change for the already repaired SSTable (first)
+ sstables = store.getSSTables();
+ // Range that's a subset of the repaired SSTable's ranges, so would cause an anti-compaction (if it wasn't repaired)
+ range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("2".getBytes()));
+ ranges = Arrays.asList(range);
+ try (Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ // use different repairedAt to ensure it doesn't change
+ ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+ fut.get();
+ }
+
+ sstablesSorted.clear();
+ sstablesSorted.addAll(store.getSSTables());
+ assertThat(sstablesSorted.size(), is(2));
+ assertThat(sstablesSorted.first().isRepaired(), is(true));
+ assertThat(sstablesSorted.last().isRepaired(), is(false));
+ assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(1L));
+ assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+ assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+ assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+ assertThat(store.getTracker().getCompacting().size(), is(0));
+
+ // Test repairing all the ranges of the repaired SSTable. Should mutate repairedAt without anticompacting,
+ // but leave the unrepaired SSTable as is.
+ range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("4".getBytes()));
+ ranges = Arrays.asList(range);
+
+ try (Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ // Same repaired at, but should be changed on the repaired SSTable now
+ ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 200);
+ fut.get();
+ }
+
+ sstablesSorted.clear();
+ sstablesSorted.addAll(store.getSSTables());
+
+ assertThat(sstablesSorted.size(), is(2));
+ assertThat(sstablesSorted.first().isRepaired(), is(true));
+ assertThat(sstablesSorted.last().isRepaired(), is(false));
+ assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(200L));
+ assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(0L));
+ assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+ assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
+ assertThat(store.getTracker().getCompacting().size(), is(0));
+
+ // Repair whole range. Should mutate repairedAt on repaired SSTable (again) and
+ // mark unrepaired SSTable as repaired
+ range = new Range<Token>(new BytesToken("/".getBytes()), new BytesToken("999".getBytes()));
+ ranges = Arrays.asList(range);
+
+ try (Refs<SSTableReader> refs = Refs.ref(sstables))
+ {
+ // Both SSTables should have repairedAt of 400
+ ListenableFuture fut = CompactionManager.instance.submitAntiCompaction(store, ranges, refs, 400);
+ fut.get();
+ }
+
+ sstablesSorted.clear();
+ sstablesSorted.addAll(store.getSSTables());
+
+ assertThat(sstablesSorted.size(), is(2));
+ assertThat(sstablesSorted.first().isRepaired(), is(true));
+ assertThat(sstablesSorted.last().isRepaired(), is(true));
+ assertThat(sstablesSorted.first().getSSTableMetadata().repairedAt, is(400L));
+ assertThat(sstablesSorted.last().getSSTableMetadata().repairedAt, is(400L));
+ assertThat(sstablesSorted.first().selfRef().globalCount(), is(1));
+ assertThat(sstablesSorted.last().selfRef().globalCount(), is(1));
assertThat(store.getTracker().getCompacting().size(), is(0));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org