You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/23 09:19:16 UTC
cassandra git commit: Avoid marking too many sstables as repaired
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 452d626a7 -> 3c8421a33
Avoid marking too many sstables as repaired
Patch by marcuse; reviewed by Joel Knighton for CASSANDRA-11696
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c8421a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c8421a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c8421a3
Branch: refs/heads/cassandra-2.1
Commit: 3c8421a3304d44c064c230c329a999373feb0607
Parents: 452d626
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri May 27 09:25:28 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jun 23 11:08:24 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 16 ++----
.../cassandra/service/ActiveRepairService.java | 59 ++++++++++++++++++--
.../service/ActiveRepairServiceTest.java | 57 +++++++++++++++----
4 files changed, 105 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a3779c..03246ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.1.15
* Prevent select statements with clustering key > 64k (CASSANDRA-11882)
+ * Avoid marking too many sstables as repaired (CASSANDRA-11696)
* Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
* Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842)
* Support mlockall on IBM POWER arch (CASSANDRA-11576)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5af63fe..87819ba 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -474,7 +474,7 @@ public class CompactionManager implements CompactionManagerMBean
/**
* Make sure the {validatedForRepair} are marked for compaction before calling this.
*
- * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
+ * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefsForAntiCompaction(..)).
*
* @param cfs
* @param ranges Ranges that the repair was carried out on
@@ -1030,17 +1030,9 @@ public class CompactionManager implements CompactionManagerMBean
sstables = cfs.selectAndReference(ColumnFamilyStore.CANONICAL_SSTABLES).refs;
else
{
- ColumnFamilyStore.RefViewFragment refView = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES);
- sstables = refView.refs;
- Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
- if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(refView.sstables)).isEmpty())
- {
- logger.error("Cannot start multiple repair sessions over the same sstables");
- throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
- }
-
- ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).addSSTables(cfs.metadata.cfId, refView.sstables);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+ prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+ sstables = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES).refs;
}
if (validator.gcBefore > 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 4c83c48..bab244d 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -425,7 +425,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
List<ListenableFuture<?>> futures = new ArrayList<>();
for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
{
- Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
+ Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey());
ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
}
@@ -465,6 +465,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
}
}
+ /**
+ * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster
+ * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables
+ * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once.
+ *
+ * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as
+ * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables,
+ * and when it is time for anticompaction we will only anticompact the sstables that are still on disk.
+ *
+ * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on
+ * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair.
+ */
public static class ParentRepairSession
{
public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
@@ -480,13 +492,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
* request, we need to fail the coordinator as well.
*/
public final boolean failed;
+ /**
+ * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
+ */
+ private final Set<UUID> marked = new HashSet<>();
public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed)
{
this.coordinator = coordinator;
for (ColumnFamilyStore cfs : columnFamilyStores)
{
-
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
}
@@ -500,9 +515,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
this(coordinator, columnFamilyStores, ranges, repairedAt, false);
}
+ /**
+ * Gets the repairing sstables for anticompaction.
+ *
+ * Note that validation and streaming uses the real unrepaired sstables.
+ *
+ * @param cfId
+ * @return
+ */
@SuppressWarnings("resource")
- public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId)
+ public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId)
{
+ assert marked.contains(cfId);
ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
for (SSTableReader sstable : getActiveSSTables(cfId))
{
@@ -515,6 +539,30 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return new Refs<>(references.build());
}
+ /**
+ * Marks all the unrepaired sstables as repairing unless we have already done so.
+ *
+ * Any of these sstables that are still on disk are then anticompacted once the streaming and validation phases are done.
+ *
+ * @param cfId
+ * @param parentSessionId used to check that we don't start multiple inc repair sessions over the same sstables
+ */
+ public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId)
+ {
+ if (!marked.contains(cfId))
+ {
+ List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(ColumnFamilyStore.UNREPAIRED_SSTABLES).sstables;
+ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+ if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
+ {
+ logger.error("Cannot start multiple repair sessions over the same sstables");
+ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+ }
+ addSSTables(cfId, sstables);
+ marked.add(cfId);
+ }
+ }
+
private Set<SSTableReader> getActiveSSTables(UUID cfId)
{
if (failed)
@@ -534,12 +582,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return activeSSTables;
}
- public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+ private void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
{
for (SSTableReader sstable : sstables)
- {
sstableMap.get(cfId).add(sstable.getFilename());
- }
}
public ParentRepairSession asFailed()
@@ -556,6 +602,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
", repairedAt=" + repairedAt +
'}';
}
+
}
/*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 26e5126..cf64322 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class ActiveRepairServiceTest extends SchemaLoader
{
@@ -58,12 +59,9 @@ public class ActiveRepairServiceTest extends SchemaLoader
UUID prsId = UUID.randomUUID();
ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
-
- //add all sstables to parent repair session
- prs.addSSTables(store.metadata.cfId, original);
-
+ prs.markSSTablesRepairing(store.metadata.cfId, prsId);
//retrieve all sstable references from parent repair sessions
- Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+ Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId);
Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
assertEquals(original, retrieved);
refs.release();
@@ -76,22 +74,62 @@ public class ActiveRepairServiceTest extends SchemaLoader
store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET);
//retrieve sstable references from parent repair session again - removed sstable must not be present
- refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+ refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId);
retrieved = Sets.newHashSet(refs.iterator());
assertEquals(newLiveSet, retrieved);
assertFalse(retrieved.contains(removed));
refs.release();
}
+ @Test
+ public void testAddingMoreSSTables()
+ {
+ ColumnFamilyStore store = prepareColumnFamilyStore();
+ Set<SSTableReader> original = store.getUnrepairedSSTables();
+ UUID prsId = UUID.randomUUID();
+ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+ prs.markSSTablesRepairing(store.metadata.cfId, prsId);
+ try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId))
+ {
+ Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+ assertEquals(original, retrieved);
+ }
+ createSSTables(store, 2);
+ boolean exception = false;
+ try
+ {
+ UUID newPrsId = UUID.randomUUID();
+ ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
+ ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId);
+ }
+ catch (Throwable t)
+ {
+ exception = true;
+ }
+ assertTrue(exception);
+
+ try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId))
+ {
+ Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+ assertEquals(original, retrieved);
+ }
+ }
+
private ColumnFamilyStore prepareColumnFamilyStore()
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
store.truncateBlocking();
store.disableAutoCompaction();
+ createSSTables(store, 10);
+ return store;
+ }
+
+ private void createSSTables(ColumnFamilyStore cfs, int count)
+ {
long timestamp = System.currentTimeMillis();
- //create 10 sstables
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < count; i++)
{
DecoratedKey key = Util.dk(Integer.toString(i));
Mutation rm = new Mutation(KEYSPACE1, key.getKey());
@@ -101,8 +139,7 @@ public class ActiveRepairServiceTest extends SchemaLoader
timestamp,
0);
rm.apply();
- store.forceBlockingFlush();
+ cfs.forceBlockingFlush();
}
- return store;
}
}