You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/02 14:11:07 UTC

[1/9] cassandra git commit: Avoid leaking references during parallel repairs

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 a24bd6c6a -> 477014b8c
  refs/heads/cassandra-3.0 632859080 -> 841a80311
  refs/heads/cassandra-3.5 70649a8d6 -> 57cbda0a4
  refs/heads/trunk 70649a8d6 -> 57cbda0a4


Avoid leaking references during parallel repairs

patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8

Branch: refs/heads/cassandra-2.2
Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b
Parents: a24bd6c
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Tue Feb 23 14:30:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 1 13:30:35 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 103ac16..a50f256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2.6
  * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
  * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
  * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9ca4406..b015bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 // flush first so everyone is validating data that is as similar as possible
                 StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-                ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
-                Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
-                for (SSTableReader sstable : sstableCandidates.sstables)
-                {
-                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
-                    {
-                        sstablesToValidate.add(sstable);
-                    }
-                }
-
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
-                if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
-                {
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-
-                sstables = Refs.tryRef(sstablesToValidate);
+                sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
-                {
-                    logger.error("Could not reference sstables");
-                    throw new RuntimeException("Could not reference sstables");
-                }
-                sstableCandidates.release();
-                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
-
+                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
@@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    {
+        Refs<SSTableReader> sstables;
+
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+        if (prs == null)
+            return null;
+        Set<SSTableReader> sstablesToValidate = new HashSet<>();
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+        {
+            for (SSTableReader sstable : sstableCandidates.sstables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+                {
+                    sstablesToValidate.add(sstable);
+                }
+            }
+
+            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+
+            if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+            {
+                logger.error("Cannot start multiple repair sessions over the same sstables");
+                throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+            }
+
+            sstables = Refs.tryRef(sstablesToValidate);
+            if (sstables == null)
+            {
+                logger.error("Could not reference sstables");
+                throw new RuntimeException("Could not reference sstables");
+            }
+        }
+
+        prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+        return sstables;
+    }
+
     /**
      * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted


[7/9] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/841a8031
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/841a8031
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/841a8031

Branch: refs/heads/cassandra-3.0
Commit: 841a80311c95b05b69e57971deff9aba03c5f5d2
Parents: 6328590 477014b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 2 14:07:36 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 2 14:07:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/841a8031/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a7b5c8a,a50f256..425cc58
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
 -2.2.6
 +3.0.4
   * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
 + * MV should only query complex columns included in the view (CASSANDRA-11069)
 + * Failed aggregate creation breaks server permanently (CASSANDRA-11064)
 + * Add sstabledump tool (CASSANDRA-7464)
 + * Introduce backpressure for hints (CASSANDRA-10972)
 + * Fix ClusteringPrefix not being able to read tombstone range boundaries (CASSANDRA-11158)
 + * Prevent logging in sandboxed state (CASSANDRA-11033)
 + * Disallow drop/alter operations of UDTs used by UDAs (CASSANDRA-10721)
 + * Add query time validation method on Index (CASSANDRA-11043)
 + * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128)
 + * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
 + * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156)
 + * Fix wrong rack counting and invalid conditions check for TokenAllocation
 +   (CASSANDRA-11139)
 + * Avoid creating empty hint files (CASSANDRA-11090)
 + * Fix leak detection strong reference loop using weak reference (CASSANDRA-11120)
 + * Configurie BatchlogManager to stop delayed tasks on shutdown (CASSANDRA-11062)
 + * Hadoop integration is incompatible with Cassandra Driver 3.0.0 (CASSANDRA-11001)
 + * Add dropped_columns to the list of schema table so it gets handled
 +   properly (CASSANDRA-11050)
 + * Fix NPE when using forceRepairRangeAsync without DC (CASSANDRA-11239)
 +Merged from 2.2:
+  * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
   * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
   * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
   * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/841a8031/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c269f7f,b015bcd..891f976
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1159,6 -1144,45 +1133,45 @@@ public class CompactionManager implemen
          }
      }
  
+     private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+     {
+         Refs<SSTableReader> sstables;
+ 
+         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+         if (prs == null)
+             return null;
+         Set<SSTableReader> sstablesToValidate = new HashSet<>();
 -        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
++        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
+         {
+             for (SSTableReader sstable : sstableCandidates.sstables)
+             {
 -                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
++                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
+                 {
+                     sstablesToValidate.add(sstable);
+                 }
+             }
+ 
+             Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+ 
+             if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+             {
+                 logger.error("Cannot start multiple repair sessions over the same sstables");
+                 throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+             }
+ 
+             sstables = Refs.tryRef(sstablesToValidate);
+             if (sstables == null)
+             {
+                 logger.error("Could not reference sstables");
+                 throw new RuntimeException("Could not reference sstables");
+             }
+         }
+ 
+         prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+ 
+         return sstables;
+     }
+ 
      /**
       * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
       * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted


[8/9] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.5

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57cbda0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57cbda0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57cbda0a

Branch: refs/heads/trunk
Commit: 57cbda0a411ec801c1532b6711506dec79917c8c
Parents: 70649a8 841a803
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 2 14:08:57 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 2 14:08:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57cbda0a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 26c3166,425cc58..5d010c4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -52,7 -21,7 +52,8 @@@ Merged from 3.0
     properly (CASSANDRA-11050)
   * Fix NPE when using forceRepairRangeAsync without DC (CASSANDRA-11239)
  Merged from 2.2:
 + * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+  * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
   * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
   * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
   * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57cbda0a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------


[9/9] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.5

Posted by ma...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.5


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57cbda0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57cbda0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57cbda0a

Branch: refs/heads/cassandra-3.5
Commit: 57cbda0a411ec801c1532b6711506dec79917c8c
Parents: 70649a8 841a803
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 2 14:08:57 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 2 14:08:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57cbda0a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 26c3166,425cc58..5d010c4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -52,7 -21,7 +52,8 @@@ Merged from 3.0
     properly (CASSANDRA-11050)
   * Fix NPE when using forceRepairRangeAsync without DC (CASSANDRA-11239)
  Merged from 2.2:
 + * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+  * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
   * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
   * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
   * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57cbda0a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------


[6/9] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/841a8031
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/841a8031
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/841a8031

Branch: refs/heads/cassandra-3.5
Commit: 841a80311c95b05b69e57971deff9aba03c5f5d2
Parents: 6328590 477014b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 2 14:07:36 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 2 14:07:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/841a8031/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a7b5c8a,a50f256..425cc58
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
 -2.2.6
 +3.0.4
   * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
 + * MV should only query complex columns included in the view (CASSANDRA-11069)
 + * Failed aggregate creation breaks server permanently (CASSANDRA-11064)
 + * Add sstabledump tool (CASSANDRA-7464)
 + * Introduce backpressure for hints (CASSANDRA-10972)
 + * Fix ClusteringPrefix not being able to read tombstone range boundaries (CASSANDRA-11158)
 + * Prevent logging in sandboxed state (CASSANDRA-11033)
 + * Disallow drop/alter operations of UDTs used by UDAs (CASSANDRA-10721)
 + * Add query time validation method on Index (CASSANDRA-11043)
 + * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128)
 + * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
 + * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156)
 + * Fix wrong rack counting and invalid conditions check for TokenAllocation
 +   (CASSANDRA-11139)
 + * Avoid creating empty hint files (CASSANDRA-11090)
 + * Fix leak detection strong reference loop using weak reference (CASSANDRA-11120)
 + * Configurie BatchlogManager to stop delayed tasks on shutdown (CASSANDRA-11062)
 + * Hadoop integration is incompatible with Cassandra Driver 3.0.0 (CASSANDRA-11001)
 + * Add dropped_columns to the list of schema table so it gets handled
 +   properly (CASSANDRA-11050)
 + * Fix NPE when using forceRepairRangeAsync without DC (CASSANDRA-11239)
 +Merged from 2.2:
+  * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
   * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
   * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
   * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/841a8031/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c269f7f,b015bcd..891f976
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1159,6 -1144,45 +1133,45 @@@ public class CompactionManager implemen
          }
      }
  
+     private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+     {
+         Refs<SSTableReader> sstables;
+ 
+         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+         if (prs == null)
+             return null;
+         Set<SSTableReader> sstablesToValidate = new HashSet<>();
 -        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
++        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
+         {
+             for (SSTableReader sstable : sstableCandidates.sstables)
+             {
 -                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
++                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
+                 {
+                     sstablesToValidate.add(sstable);
+                 }
+             }
+ 
+             Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+ 
+             if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+             {
+                 logger.error("Cannot start multiple repair sessions over the same sstables");
+                 throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+             }
+ 
+             sstables = Refs.tryRef(sstablesToValidate);
+             if (sstables == null)
+             {
+                 logger.error("Could not reference sstables");
+                 throw new RuntimeException("Could not reference sstables");
+             }
+         }
+ 
+         prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+ 
+         return sstables;
+     }
+ 
      /**
       * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
       * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted


[3/9] cassandra git commit: Avoid leaking references during parallel repairs

Posted by ma...@apache.org.
Avoid leaking references during parallel repairs

patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8

Branch: refs/heads/cassandra-3.5
Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b
Parents: a24bd6c
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Tue Feb 23 14:30:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 1 13:30:35 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 103ac16..a50f256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2.6
  * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
  * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
  * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9ca4406..b015bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 // flush first so everyone is validating data that is as similar as possible
                 StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-                ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
-                Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
-                for (SSTableReader sstable : sstableCandidates.sstables)
-                {
-                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
-                    {
-                        sstablesToValidate.add(sstable);
-                    }
-                }
-
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
-                if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
-                {
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-
-                sstables = Refs.tryRef(sstablesToValidate);
+                sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
-                {
-                    logger.error("Could not reference sstables");
-                    throw new RuntimeException("Could not reference sstables");
-                }
-                sstableCandidates.release();
-                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
-
+                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
@@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    {
+        Refs<SSTableReader> sstables;
+
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+        if (prs == null)
+            return null;
+        Set<SSTableReader> sstablesToValidate = new HashSet<>();
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+        {
+            for (SSTableReader sstable : sstableCandidates.sstables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+                {
+                    sstablesToValidate.add(sstable);
+                }
+            }
+
+            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+
+            if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+            {
+                logger.error("Cannot start multiple repair sessions over the same sstables");
+                throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+            }
+
+            sstables = Refs.tryRef(sstablesToValidate);
+            if (sstables == null)
+            {
+                logger.error("Could not reference sstables");
+                throw new RuntimeException("Could not reference sstables");
+            }
+        }
+
+        prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+        return sstables;
+    }
+
     /**
      * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted


[5/9] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by ma...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/841a8031
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/841a8031
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/841a8031

Branch: refs/heads/trunk
Commit: 841a80311c95b05b69e57971deff9aba03c5f5d2
Parents: 6328590 477014b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Mar 2 14:07:36 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 2 14:07:57 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/841a8031/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a7b5c8a,a50f256..425cc58
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
 -2.2.6
 +3.0.4
   * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
 + * MV should only query complex columns included in the view (CASSANDRA-11069)
 + * Failed aggregate creation breaks server permanently (CASSANDRA-11064)
 + * Add sstabledump tool (CASSANDRA-7464)
 + * Introduce backpressure for hints (CASSANDRA-10972)
 + * Fix ClusteringPrefix not being able to read tombstone range boundaries (CASSANDRA-11158)
 + * Prevent logging in sandboxed state (CASSANDRA-11033)
 + * Disallow drop/alter operations of UDTs used by UDAs (CASSANDRA-10721)
 + * Add query time validation method on Index (CASSANDRA-11043)
 + * Avoid potential AssertionError in mixed version cluster (CASSANDRA-11128)
 + * Properly handle hinted handoff after topology changes (CASSANDRA-5902)
 + * AssertionError when listing sstable files on inconsistent disk state (CASSANDRA-11156)
 + * Fix wrong rack counting and invalid conditions check for TokenAllocation
 +   (CASSANDRA-11139)
 + * Avoid creating empty hint files (CASSANDRA-11090)
 + * Fix leak detection strong reference loop using weak reference (CASSANDRA-11120)
 + * Configurie BatchlogManager to stop delayed tasks on shutdown (CASSANDRA-11062)
 + * Hadoop integration is incompatible with Cassandra Driver 3.0.0 (CASSANDRA-11001)
 + * Add dropped_columns to the list of schema table so it gets handled
 +   properly (CASSANDRA-11050)
 + * Fix NPE when using forceRepairRangeAsync without DC (CASSANDRA-11239)
 +Merged from 2.2:
+  * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
   * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
   * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
   * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/841a8031/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index c269f7f,b015bcd..891f976
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1159,6 -1144,45 +1133,45 @@@ public class CompactionManager implemen
          }
      }
  
+     private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+     {
+         Refs<SSTableReader> sstables;
+ 
+         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+         if (prs == null)
+             return null;
+         Set<SSTableReader> sstablesToValidate = new HashSet<>();
 -        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
++        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (s) -> !prs.isIncremental || !s.isRepaired())))
+         {
+             for (SSTableReader sstable : sstableCandidates.sstables)
+             {
 -                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
++                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
+                 {
+                     sstablesToValidate.add(sstable);
+                 }
+             }
+ 
+             Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+ 
+             if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+             {
+                 logger.error("Cannot start multiple repair sessions over the same sstables");
+                 throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+             }
+ 
+             sstables = Refs.tryRef(sstablesToValidate);
+             if (sstables == null)
+             {
+                 logger.error("Could not reference sstables");
+                 throw new RuntimeException("Could not reference sstables");
+             }
+         }
+ 
+         prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+ 
+         return sstables;
+     }
+ 
      /**
       * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
       * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted


[4/9] cassandra git commit: Avoid leaking references during parallel repairs

Posted by ma...@apache.org.
Avoid leaking references during parallel repairs

patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8

Branch: refs/heads/trunk
Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b
Parents: a24bd6c
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Tue Feb 23 14:30:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 1 13:30:35 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 103ac16..a50f256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2.6
  * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
  * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
  * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9ca4406..b015bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 // flush first so everyone is validating data that is as similar as possible
                 StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-                ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
-                Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
-                for (SSTableReader sstable : sstableCandidates.sstables)
-                {
-                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
-                    {
-                        sstablesToValidate.add(sstable);
-                    }
-                }
-
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
-                if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
-                {
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-
-                sstables = Refs.tryRef(sstablesToValidate);
+                sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
-                {
-                    logger.error("Could not reference sstables");
-                    throw new RuntimeException("Could not reference sstables");
-                }
-                sstableCandidates.release();
-                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
-
+                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
@@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    {
+        Refs<SSTableReader> sstables;
+
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+        if (prs == null)
+            return null;
+        Set<SSTableReader> sstablesToValidate = new HashSet<>();
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+        {
+            for (SSTableReader sstable : sstableCandidates.sstables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+                {
+                    sstablesToValidate.add(sstable);
+                }
+            }
+
+            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+
+            if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+            {
+                logger.error("Cannot start multiple repair sessions over the same sstables");
+                throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+            }
+
+            sstables = Refs.tryRef(sstablesToValidate);
+            if (sstables == null)
+            {
+                logger.error("Could not reference sstables");
+                throw new RuntimeException("Could not reference sstables");
+            }
+        }
+
+        prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+        return sstables;
+    }
+
     /**
      * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted


[2/9] cassandra git commit: Avoid leaking references during parallel repairs

Posted by ma...@apache.org.
Avoid leaking references during parallel repairs

patch by Marcus Olsson; reviewed by Marcus Eriksson for CASSANDRA-11215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/477014b8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/477014b8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/477014b8

Branch: refs/heads/cassandra-3.0
Commit: 477014b8cddb8cf1a73a7e8b408f130b64a37c6b
Parents: a24bd6c
Author: Marcus Olsson <ma...@ericsson.com>
Authored: Tue Feb 23 14:30:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 1 13:30:35 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 69 ++++++++++++--------
 2 files changed, 42 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 103ac16..a50f256 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2.6
  * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
+ * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
  * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
  * Replacing an aggregate with a new version doesn't reset INITCOND (CASSANDRA-10840)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/477014b8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 9ca4406..b015bcd 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1075,35 +1075,9 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 // flush first so everyone is validating data that is as similar as possible
                 StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
-                ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
-                ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES);
-                Set<SSTableReader> sstablesToValidate = new HashSet<>();
-
-                for (SSTableReader sstable : sstableCandidates.sstables)
-                {
-                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
-                    {
-                        sstablesToValidate.add(sstable);
-                    }
-                }
-
-                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
-                if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
-                {
-                    logger.error("Cannot start multiple repair sessions over the same sstables");
-                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                }
-
-                sstables = Refs.tryRef(sstablesToValidate);
+                sstables = getSSTablesToValidate(cfs, validator);
                 if (sstables == null)
-                {
-                    logger.error("Could not reference sstables");
-                    throw new RuntimeException("Could not reference sstables");
-                }
-                sstableCandidates.release();
-                prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
-
+                    return; // this means the parent repair session was removed - the repair session failed on another node and we removed it
                 if (validator.gcBefore > 0)
                     gcBefore = validator.gcBefore;
                 else
@@ -1170,6 +1144,45 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    private synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Validator validator)
+    {
+        Refs<SSTableReader> sstables;
+
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+        if (prs == null)
+            return null;
+        Set<SSTableReader> sstablesToValidate = new HashSet<>();
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(prs.isIncremental ? ColumnFamilyStore.UNREPAIRED_SSTABLES : ColumnFamilyStore.CANONICAL_SSTABLES))
+        {
+            for (SSTableReader sstable : sstableCandidates.sstables)
+            {
+                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+                {
+                    sstablesToValidate.add(sstable);
+                }
+            }
+
+            Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+
+            if (!Sets.intersection(currentlyRepairing, sstablesToValidate).isEmpty())
+            {
+                logger.error("Cannot start multiple repair sessions over the same sstables");
+                throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+            }
+
+            sstables = Refs.tryRef(sstablesToValidate);
+            if (sstables == null)
+            {
+                logger.error("Could not reference sstables");
+                throw new RuntimeException("Could not reference sstables");
+            }
+        }
+
+        prs.addSSTables(cfs.metadata.cfId, sstablesToValidate);
+
+        return sstables;
+    }
+
     /**
      * Splits up an sstable into two new sstables. The first of the new tables will store repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, the original sstable is marked as compacted