You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/08/10 19:02:43 UTC

cassandra git commit: Fix race / ref leak in PendingRepairManager

Repository: cassandra
Updated Branches:
  refs/heads/trunk ba87ab4e9 -> 9c3354e32


Fix race / ref leak in PendingRepairManager

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13751


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c3354e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c3354e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c3354e3

Branch: refs/heads/trunk
Commit: 9c3354e3211c6a3f3982e87477e156c29cd9b7ea
Parents: ba87ab4
Author: Blake Eggleston <bd...@gmail.com>
Authored: Tue Aug 8 10:32:35 2017 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Thu Aug 10 12:01:00 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../compaction/AbstractCompactionStrategy.java  | 29 ++---------------
 .../compaction/CompactionStrategyManager.java   | 25 +++++++++++---
 .../compaction/LeveledCompactionStrategy.java   | 10 +-----
 .../db/compaction/PendingRepairManager.java     | 34 +++++++++++++++++---
 .../cassandra/io/sstable/ISSTableScanner.java   | 34 ++++++++++++++++++++
 .../db/compaction/PendingRepairManagerTest.java | 24 ++++++++++++++
 7 files changed, 113 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 849848f..e997b50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
  * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
  * Improve sstablemetadata output (CASSANDRA-11483)
  * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 5333683..f1f42a7 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -293,15 +293,7 @@ public abstract class AbstractCompactionStrategy
         }
         catch (Throwable t)
         {
-            try
-            {
-                new ScannerList(scanners).close();
-            }
-            catch (Throwable t2)
-            {
-                t.addSuppressed(t2);
-            }
-            throw t;
+            ISSTableScanner.closeAllAndPropagate(scanners, t);
         }
         return new ScannerList(scanners);
     }
@@ -385,24 +377,7 @@ public abstract class AbstractCompactionStrategy
 
         public void close()
         {
-            Throwable t = null;
-            for (ISSTableScanner scanner : scanners)
-            {
-                try
-                {
-                    scanner.close();
-                }
-                catch (Throwable t2)
-                {
-                    JVMStabilityInspector.inspectThrowable(t2);
-                    if (t == null)
-                        t = t2;
-                    else
-                        t.addSuppressed(t2);
-                }
-            }
-            if (t != null)
-                throw Throwables.propagate(t);
+            ISSTableScanner.closeAllAndPropagate(scanners, null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index e58ccc2..6342a1b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.function.Supplier;
@@ -735,7 +734,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @return
      */
     @SuppressWarnings("resource")
-    public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
+    public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
         assert repaired.size() == unrepaired.size();
         assert repaired.size() == pendingRepairs.size();
@@ -781,13 +780,31 @@ public class CompactionStrategyManager implements INotificationConsumer
                 if (!unrepairedSSTables.get(i).isEmpty())
                     scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), ranges).scanners);
             }
-
-            return new AbstractCompactionStrategy.ScannerList(scanners);
+        }
+        catch (PendingRepairManager.IllegalSSTableArgumentException e)
+        {
+            ISSTableScanner.closeAllAndPropagate(scanners, new ConcurrentModificationException(e));
         }
         finally
         {
             readLock.unlock();
         }
+        return new AbstractCompactionStrategy.ScannerList(scanners);
+    }
+
+    public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
+    {
+        while (true)
+        {
+            try
+            {
+                return maybeGetScanners(sstables, ranges);
+            }
+            catch (ConcurrentModificationException e)
+            {
+                logger.debug("SSTable repairedAt/pendingRepaired values changed while getting scanners");
+            }
+        }
     }
 
     public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 4f11a03..8086be9 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -307,15 +307,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
         }
         catch (Throwable t)
         {
-            try
-            {
-                new ScannerList(scanners).close();
-            }
-            catch (Throwable t2)
-            {
-                t.addSuppressed(t2);
-            }
-            throw t;
+            ISSTableScanner.closeAllAndPropagate(scanners, t);
         }
 
         return new ScannerList(scanners);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index afde263..183af7a 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -64,6 +64,17 @@ class PendingRepairManager
     private final CompactionParams params;
     private volatile ImmutableMap<UUID, AbstractCompactionStrategy> strategies = ImmutableMap.of();
 
+    /**
+     * Indicates we're being asked to do something with an sstable that isn't marked pending repair
+     */
+    public static class IllegalSSTableArgumentException extends IllegalArgumentException
+    {
+        public IllegalSSTableArgumentException(String s)
+        {
+            super(s);
+        }
+    }
+
     PendingRepairManager(ColumnFamilyStore cfs, CompactionParams params)
     {
         this.cfs = cfs;
@@ -88,6 +99,7 @@ class PendingRepairManager
 
     AbstractCompactionStrategy getOrCreate(UUID id)
     {
+        checkPendingID(id);
         assert id != null;
         AbstractCompactionStrategy strategy = get(id);
         if (strategy == null)
@@ -107,9 +119,16 @@ class PendingRepairManager
         return strategy;
     }
 
+    private static void checkPendingID(UUID pendingID)
+    {
+        if (pendingID == null)
+        {
+            throw new IllegalSSTableArgumentException("sstable is not pending repair");
+        }
+    }
+
     AbstractCompactionStrategy getOrCreate(SSTableReader sstable)
     {
-        assert sstable.isPendingRepair();
         return getOrCreate(sstable.getSSTableMetadata().pendingRepair);
     }
 
@@ -352,14 +371,21 @@ class PendingRepairManager
         for (SSTableReader sstable : sstables)
         {
             UUID sessionID = sstable.getSSTableMetadata().pendingRepair;
-            assert sessionID != null;
+            checkPendingID(sessionID);
             sessionSSTables.computeIfAbsent(sessionID, k -> new HashSet<>()).add(sstable);
         }
 
         Set<ISSTableScanner> scanners = new HashSet<>(sessionSSTables.size());
-        for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet())
+        try
+        {
+            for (Map.Entry<UUID, Set<SSTableReader>> entry : sessionSSTables.entrySet())
+            {
+                scanners.addAll(getOrCreate(entry.getKey()).getScanners(entry.getValue(), ranges).scanners);
+            }
+        }
+        catch (Throwable t)
         {
-            scanners.addAll(getOrCreate(entry.getKey()).getScanners(entry.getValue(), ranges).scanners);
+            ISSTableScanner.closeAllAndPropagate(scanners, t);
         }
         return scanners;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index 2dff34e..1c1d74b 100644
--- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -19,7 +19,12 @@
 
 package org.apache.cassandra.io.sstable;
 
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
  * An ISSTableScanner is an abstraction allowing multiple SSTableScanners to be
@@ -32,4 +37,33 @@ public interface ISSTableScanner extends UnfilteredPartitionIterator
     public long getCurrentPosition();
     public long getBytesScanned();
     public String getBackingFiles();
+
+    public static void closeAllAndPropagate(Collection<ISSTableScanner> scanners, Throwable throwable)
+    {
+        for (ISSTableScanner scanner: scanners)
+        {
+            try
+            {
+                scanner.close();
+            }
+            catch (Throwable t2)
+            {
+                JVMStabilityInspector.inspectThrowable(t2);
+                if (throwable == null)
+                {
+                    throwable = t2;
+                }
+                else
+                {
+                    throwable.addSuppressed(t2);
+                }
+            }
+        }
+
+        if (throwable != null)
+        {
+            Throwables.propagate(throwable);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c3354e3/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
index a173b4b..93b68b5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java
@@ -240,4 +240,28 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest
             tasks.stream().forEach(t -> t.transaction.abort());
         }
     }
+
+    /**
+     * Tests that a IllegalSSTableArgumentException is thrown if we try to get
+     * scanners for an sstable that isn't pending repair
+     */
+    @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class)
+    public void getScannersInvalidSSTable() throws Exception
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+        SSTableReader sstable = makeSSTable(true);
+        prm.getScanners(Collections.singleton(sstable), Collections.singleton(RANGE1));
+    }
+
+    /**
+     * Tests that a IllegalSSTableArgumentException is thrown if we try to get
+     * scanners for an sstable that isn't pending repair
+     */
+    @Test(expected = PendingRepairManager.IllegalSSTableArgumentException.class)
+    public void getOrCreateInvalidSSTable() throws Exception
+    {
+        PendingRepairManager prm = csm.getPendingRepairManagers().get(0);
+        SSTableReader sstable = makeSSTable(true);
+        prm.getOrCreate(sstable);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org