You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/02 14:11:10 UTC

[4/9] cassandra git commit: Avoid leaking references during parallel repairs

Avoid leaking references during parallel repairs

patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8

Branch: refs/heads/trunk
Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b
Parents: a24bd6c
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Tue Feb 23 14:30:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 1 13:30:35 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 103ac16..a50f256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2.6
  * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
  * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
  * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9ca4406..b015bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 // flush first so everyone is validating data that is as similar as possible
                 StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-                ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
-                Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
-                for (SSTableReader sstable : sstableCandidates.sstables)
-                {
-                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
-                    {
-                        sstablesToValidate.add(sstable);
-                    }
-                }
-
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
-                if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
-                {
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-
-                sstables = Refs.tryRef(sstablesToValidate);
+                sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
-                {
-                    logger.error("Could not reference sstables");
-                    throw new RuntimeException("Could not reference sstables");
-                }
-                sstableCandidates.release();
-                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
-
+                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
@@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    {
+        Refs<SSTableReader> sstables;
+
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+        if (prs == null)
+            return null;
+        Set<SSTableReader> sstablesToValidate = new HashSet<>();
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+        {
+            for (SSTableReader sstable : sstableCandidates.sstables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+                {
+                    sstablesToValidate.add(sstable);
+                }
+            }
+
+            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+
+            if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+            {
+                logger.error("Cannot start multiple repair sessions over the same sstables");
+                throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+            }
+
+            sstables = Refs.tryRef(sstablesToValidate);
+            if (sstables == null)
+            {
+                logger.error("Could not reference sstables");
+                throw new RuntimeException("Could not reference sstables");
+            }
+        }
+
+        prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+        return sstables;
+    }
+
     /**
      * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted