You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/08/10 19:02:43 UTC
cassandra git commit: Fix race / ref leak in PendingRepairManager
Repository: cassandra
Updated Branches:
refs/heads/trunk ba87ab4e9 -> 9c3354e32
Fix race / ref leak in PendingRepairManager
Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13751
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c3354e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c3354e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c3354e3
Branch: refs/heads/trunk
Commit: 9c3354e3211c6a3f3982e87477e156c29cd9b7ea
Parents: ba87ab4
Author: Blake Eggleston <bd...@gmail.com>
Authored: Tue Aug 8 10:32:35 2017 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Thu Aug 10 12:01:00 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../compaction/AbstractCompactionStrategy.java | 29 ++---------------
.../compaction/CompactionStrategyManager.java | 25 +++++++++++---
.../compaction/LeveledCompactionStrategy.java | 10 +-----
.../db/compaction/PendingRepairManager.java | 34 +++++++++++++++++---
.../cassandra/io/sstable/ISSTableScanner.java | 34 ++++++++++++++++++++
.../db/compaction/PendingRepairManagerTest.java | 24 ++++++++++++++
7 files changed, 113 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 849848f..e997b50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
* Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
* Improve sstablemetadata output (CASSANDRA-11483)
* Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 5333683..f1f42a7 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -293,15 +293,7 @@ public abstract class AbstractCompactionStrategy
}
catch (Throwable t)
{
- try
- {
- new ScannerList(scanners).close();
- }
- catch (Throwable t2)
- {
- t.addSuppressed(t2);
- }
- throw t;
+ ISSTableScanner.closeAllAndPropagate(scanners, t);
}
return new ScannerList(scanners);
}
@@ -385,24 +377,7 @@ public abstract class AbstractCompactionStrategy
public void close()
{
- Throwable t = null;
- for (ISSTableScanner scanner : scanners)
- {
- try
- {
- scanner.close();
- }
- catch (Throwable t2)
- {
- JVMStabilityInspector.inspectThrowable(t2);
- if (t == null)
- t = t2;
- else
- t.addSuppressed(t2);
- }
- }
- if (t != null)
- throw Throwables.propagate(t);
+ ISSTableScanner.closeAllAndPropagate(scanners, null);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index e58ccc2..6342a1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.function.Supplier;
@@ -735,7 +734,7 @@ public class CompactionStrategyManager implements INotificationConsumer
* @return
*/
@SuppressWarnings("resource")
- public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
{
assert repaired.size() == unrepaired.size();
assert repaired.size() == pendingRepairs.size();
@@ -781,13 +780,31 @@ public class CompactionStrategyManager implements INotificationConsumer
if (!unrepairedSSTables.get(i).isEmpty())
scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), ranges).scanners);
}
-
- return new AbstractCompactionStrategy.ScannerList(scanners);
+ }
+ catch (PendingRepairManager.IllegalSSTableArgumentException e)
+ {
+ ISSTableScanner.closeAllAndPropagate(scanners, new ConcurrentModificationException(e));
}
finally
{
readLock.unlock();
}
+ return new AbstractCompactionStrategy.ScannerList(scanners);
+ }
+
+ public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges)
+ {
+ while (true)
+ {
+ try
+ {
+ return maybeGetScanners(sstables, ranges);
+ }
+ catch (ConcurrentModificationException e)
+ {
+ logger.debug("SSTable repairedAt/pendingRepaired values changed while getting scanners");
+ }
+ }
}
public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 4f11a03..8086be9 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -307,15 +307,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
}
catch (Throwable t)
{
- try
- {
- new ScannerList(scanners).close();
- }
- catch (Throwable t2)
- {
- t.addSuppressed(t2);
- }
- throw t;
+ ISSTableScanner.closeAllAndPropagate(scanners, t);
}
return new ScannerList(scanners);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index afde263..183af7a 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -64,6 +64,17 @@ class PendingRepairManager
private final CompactionParams params;
private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies = ImmutableMap.of();
+ /**
+ * Indicates we're being asked to do something with an sstable that isn't marked pending repair
+ */
+ public static class IllegalSSTableArgumentException extends IllegalArgumentException
+ {
+ public IllegalSSTableArgumentException(String s)
+ {
+ super(s);
+ }
+ }
+
PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params)
{
this.cfs = cfs;
@@ -88,6 +99,7 @@ class PendingRepairManager
AbstractCompactionStrategy getOrCreate(UUID id)
{
+ checkPendingID(id);
assert id != null;
AbstractCompactionStrategy strategy = get(id);
if (strategy == null)
@@ -107,9 +119,16 @@ class PendingRepairManager
return strategy;
}
+ private static void checkPendingID(UUID pendingID)
+ {
+ if (pendingID == null)
+ {
+ throw new IllegalSSTableArgumentException("sstable is not pending repair");
+ }
+ }
+
AbstractCompactionStrategy getOrCreate(SSTableReader sstable)
{
- assert sstable.isPendingRepair();
return getOrCreate(sstable.getSSTableMetadata().pendingRepair);
}
@@ -352,14 +371,21 @@ class PendingRepairManager
for (SSTableReader sstable : sstables)
{
UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
- assert sessionID != null;
+ checkPendingID(sessionID);
sessionSSTables.computeIfAbsent(sessionID, k -> new HashSet<>()).add(sstable);
}
Set<ISSTableScanner> scanners = new HashSet<>(sessionSSTables.size());
- for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet())
+ try
+ {
+ for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet())
+ {
+ scanners.addAll(getOrCreate(entry.getKey()).getScanners(entry.getValue(), ranges).scanners);
+ }
+ }
+ catch (Throwable t)
{
- scanners.addAll(getOrCreate(entry.getKey()).getScanners(entry.getValue(), ranges).scanners);
+ ISSTableScanner.closeAllAndPropagate(scanners, t);
}
return scanners;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index 2dff34e..1c1d74b 100644
--- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -19,7 +19,12 @@
package org.apache.cassandra.io.sstable;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.utils.JVMStabilityInspector;
/**
* An ISSTableScanner is an abstraction allowing multiple SSTableScanners to be
@@ -32,4 +37,33 @@ public interface ISSTableScanner extends UnfilteredPartitionIterator
public long getCurrentPosition();
public long getBytesScanned();
public String getBackingFiles();
+
+ public static void closeAllAndPropagate(Collection<ISSTableScanner> scanners, Throwable throwable)
+ {
+ for (ISSTableScanner scanner: scanners)
+ {
+ try
+ {
+ scanner.close();
+ }
+ catch (Throwable t2)
+ {
+ JVMStabilityInspector.inspectThrowable(t2);
+ if (throwable == null)
+ {
+ throwable = t2;
+ }
+ else
+ {
+ throwable.addSuppressed(t2);
+ }
+ }
+ }
+
+ if (throwable != null)
+ {
+ Throwables.propagate(throwable);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index a173b4b..93b68b5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -240,4 +240,28 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
tasks.stream().forEach(t -> t.transaction.abort());
}
}
+
+ /**
+ * Tests that a IllegalSSTableArgumentException is thrown if we try to get
+ * scanners for an sstable that isn't pending repair
+ */
+ @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class)
+ public void getScannersInvalidSSTable() throws Exception
+ {
+ PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+ SSTableReader sstable = makeSSTable(true);
+ prm.getScanners(Collections.singleton(sstable), Collections.singleton(RANGE1));
+ }
+
+ /**
+ * Tests that a IllegalSSTableArgumentException is thrown if we try to get
+ * scanners for an sstable that isn't pending repair
+ */
+ @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class)
+ public void getOrCreateInvalidSSTable() throws Exception
+ {
+ PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+ SSTableReader sstable = makeSSTable(true);
+ prm.getOrCreate(sstable);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org