You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/02 14:11:10 UTC
[4/9] cassandra git commit: Avoid leaking references during parallel
repairs
Avoid leaking references during parallel repairs
patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8
Branch: refs/heads/trunk
Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b
Parents: a24bd6c
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Tue Feb 23 14:30:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 1 13:30:35 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/CompactionManager.java | 69 ++++++++++++--------
2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 103ac16..a50f256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
2.2.6
* Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
* Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
* Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
* Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9ca4406..b015bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean
{
// flush first so everyone is validating data that is as similar as possible
StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
- ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
- ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
- Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
- for (SSTableReader sstable : sstableCandidates.sstables)
- {
- if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
- {
- sstablesToValidate.add(sstable);
- }
- }
-
- Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
- if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
- {
- logger.error("Cannot start multiple repair sessions over the same sstables");
- throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
- }
-
- sstables = Refs.tryRef(sstablesToValidate);
+ sstables = getSSTablesToValidate(cfs, validator);
if (sstables == null)
- {
- logger.error("Could not reference sstables");
- throw new RuntimeException("Could not reference sstables");
- }
- sstableCandidates.release();
- prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
-
+ return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
if (validator.gcBefore > 0)
gcBefore = validator.gcBefore;
else
@@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean
}
}
+ private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+ {
+ Refs<SSTableReader> sstables;
+
+ ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+ if (prs == null)
+ return null;
+ Set<SSTableReader> sstablesToValidate = new HashSet<>();
+ try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+ {
+ for (SSTableReader sstable : sstableCandidates.sstables)
+ {
+ if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+ {
+ sstablesToValidate.add(sstable);
+ }
+ }
+
+ Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+
+ if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+ {
+ logger.error("Cannot start multiple repair sessions over the same sstables");
+ throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+ }
+
+ sstables = Refs.tryRef(sstablesToValidate);
+ if (sstables == null)
+ {
+ logger.error("Could not reference sstables");
+ throw new RuntimeException("Could not reference sstables");
+ }
+ }
+
+ prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+ return sstables;
+ }
+
/**
* Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
* will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted