You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/03/15 15:35:07 UTC

cassandra git commit: Improve CompactionManager concurrency

Repository: cassandra
Updated Branches:
  refs/heads/trunk f8b3a1588 -> ed0a07c38


Improve CompactionManager concurrency

Patch by marcuse; reviewed by yukim for CASSANDRA-10099


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ed0a07c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ed0a07c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ed0a07c3

Branch: refs/heads/trunk
Commit: ed0a07c386658395803886ac5f1cf243cd413cbe
Parents: f8b3a15
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Feb 8 15:56:12 2016 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Mar 15 15:30:38 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../compaction/AbstractCompactionStrategy.java  |   6 +
 .../compaction/CompactionStrategyManager.java   | 552 +++++++++++++------
 .../DateTieredCompactionStrategy.java           |   4 +-
 .../compaction/LeveledCompactionStrategy.java   |   2 +-
 .../SizeTieredCompactionStrategy.java           |   4 +-
 .../SSTableRepairStatusChanged.java             |   4 +-
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |   2 +-
 9 files changed, 396 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6f1c4a3..4fc03b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Improve concurrency in CompactionManager (CASSANDRA-10099)
  * (cqlsh) interpret CQL type for formatting blobs (CASSANDRA-11274)
  * Refuse to start and print txn log information in case of disk
    corruption (CASSANDRA-10112)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index cab56bb..b6d623b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -315,6 +315,12 @@ public abstract class AbstractCompactionStrategy
 
     public abstract void addSSTable(SSTableReader added);
 
+    public synchronized void addSSTables(Iterable<SSTableReader> added)
+    {
+        for (SSTableReader sstable : added)
+            addSSTable(sstable);
+    }
+
     public abstract void removeSSTable(SSTableReader sstable);
 
     public static class ScannerList implements AutoCloseable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index a9d42eb..1d387dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
@@ -60,11 +62,15 @@ public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
     private final ColumnFamilyStore cfs;
-    private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>();
-    private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+    private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
+    private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
     private volatile boolean enabled = true;
-    public boolean isActive = true;
+    public volatile boolean isActive = true;
     private volatile CompactionParams params;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
     /*
         We keep a copy of the schema compaction parameters here to be able to decide if we
         should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
@@ -72,7 +78,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
         we will use the new compaction parameters.
      */
-    private CompactionParams schemaCompactionParams;
+    private volatile CompactionParams schemaCompactionParams;
     private Directories.DataDirectory[] locations;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
@@ -92,22 +98,29 @@ public class CompactionStrategyManager implements INotificationConsumer
      * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks)
      *
      */
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         if (!isEnabled())
             return null;
 
         maybeReload(cfs.metadata);
-
-        List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size());
-        strategies.addAll(repaired);
-        strategies.addAll(unrepaired);
-        Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
-        for (AbstractCompactionStrategy strategy : strategies)
+        List<AbstractCompactionStrategy> strategies = new ArrayList<>();
+        readLock.lock();
+        try
+        {
+            strategies.addAll(repaired);
+            strategies.addAll(unrepaired);
+            Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+            for (AbstractCompactionStrategy strategy : strategies)
+            {
+                AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
+                if (task != null)
+                    return task;
+            }
+        }
+        finally
         {
-            AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
-            if (task != null)
-                return task;
+            readLock.unlock();
         }
         return null;
     }
@@ -117,7 +130,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         return enabled && isActive;
     }
 
-    public synchronized void resume()
+    public void resume()
     {
         isActive = true;
     }
@@ -127,21 +140,28 @@ public class CompactionStrategyManager implements INotificationConsumer
      *
      * Separate call from enable/disable to not have to save the enabled-state externally
       */
-    public synchronized void pause()
+    public void pause()
     {
         isActive = false;
     }
 
-
     private void startup()
     {
-        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+        writeLock.lock();
+        try
         {
-            if (sstable.openReason != SSTableReader.OpenReason.EARLY)
-                getCompactionStrategyFor(sstable).addSSTable(sstable);
+            for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
+            {
+                if (sstable.openReason != SSTableReader.OpenReason.EARLY)
+                    getCompactionStrategyFor(sstable).addSSTable(sstable);
+            }
+            repaired.forEach(AbstractCompactionStrategy::startup);
+            unrepaired.forEach(AbstractCompactionStrategy::startup);
+        }
+        finally
+        {
+            writeLock.unlock();
         }
-        repaired.forEach(AbstractCompactionStrategy::startup);
-        unrepaired.forEach(AbstractCompactionStrategy::startup);
     }
 
     /**
@@ -154,10 +174,18 @@ public class CompactionStrategyManager implements INotificationConsumer
     private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
         int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-        if (sstable.isRepaired())
-            return repaired.get(index);
-        else
-            return unrepaired.get(index);
+        readLock.lock();
+        try
+        {
+            if (sstable.isRepaired())
+                return repaired.get(index);
+            else
+                return unrepaired.get(index);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     /**
@@ -200,18 +228,35 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public void shutdown()
     {
-        isActive = false;
-        repaired.forEach(AbstractCompactionStrategy::shutdown);
-        unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        writeLock.lock();
+        try
+        {
+            isActive = false;
+            repaired.forEach(AbstractCompactionStrategy::shutdown);
+            unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
-    public synchronized void maybeReload(CFMetaData metadata)
+    public void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
         if (metadata.params.compaction.equals(schemaCompactionParams) &&
             Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
             return;
-        reload(metadata);
+
+        writeLock.lock();
+        try
+        {
+            reload(metadata);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     /**
@@ -220,7 +265,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      * Called after changing configuration and at startup.
      * @param metadata
      */
-    public synchronized void reload(CFMetaData metadata)
+    private void reload(CFMetaData metadata)
     {
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
         if (!metadata.params.compaction.equals(schemaCompactionParams))
@@ -247,34 +292,50 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int getUnleveledSSTables()
     {
-        if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
+        readLock.lock();
+        try
         {
-            int count = 0;
-            for (AbstractCompactionStrategy strategy : repaired)
-                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
-            for (AbstractCompactionStrategy strategy : unrepaired)
-                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
-            return count;
+            if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
+            {
+                int count = 0;
+                for (AbstractCompactionStrategy strategy : repaired)
+                    count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
+                for (AbstractCompactionStrategy strategy : unrepaired)
+                    count += ((LeveledCompactionStrategy) strategy).getLevelSize(0);
+                return count;
+            }
+        }
+        finally
+        {
+            readLock.unlock();
         }
         return 0;
     }
 
-    public synchronized int[] getSSTableCountPerLevel()
+    public int[] getSSTableCountPerLevel()
     {
-        if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
+        readLock.lock();
+        try
         {
-            int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
-            for (AbstractCompactionStrategy strategy : repaired)
+            if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
             {
-                int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
-                res = sumArrays(res, repairedCountPerLevel);
-            }
-            for (AbstractCompactionStrategy strategy : unrepaired)
-            {
-                int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
-                res = sumArrays(res, unrepairedCountPerLevel);
+                int[] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
+                for (AbstractCompactionStrategy strategy : repaired)
+                {
+                    int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                    res = sumArrays(res, repairedCountPerLevel);
+                }
+                for (AbstractCompactionStrategy strategy : unrepaired)
+                {
+                    int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                    res = sumArrays(res, unrepairedCountPerLevel);
+                }
+                return res;
             }
-            return res;
+        }
+        finally
+        {
+            readLock.unlock();
         }
         return null;
     }
@@ -296,85 +357,112 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public boolean shouldDefragment()
     {
-        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
-        return repaired.get(0).shouldDefragment();
+        readLock.lock();
+        try
+        {
+            assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+            return repaired.get(0).shouldDefragment();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public Directories getDirectories()
     {
-        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
-        return repaired.get(0).getDirectories();
+        readLock.lock();
+        try
+        {
+            assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+            return repaired.get(0).getDirectories();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
-    public synchronized void handleNotification(INotification notification, Object sender)
+    private void handleFlushNotification(Iterable<SSTableReader> added)
     {
-        maybeReload(cfs.metadata);
-        if (notification instanceof SSTableAddedNotification)
+        readLock.lock();
+        try
         {
-            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
-            for (SSTableReader sstable : flushedNotification.added)
+            for (SSTableReader sstable : added)
                 getCompactionStrategyFor(sstable).addSSTable(sstable);
         }
-        else if (notification instanceof SSTableListChangedNotification)
+        finally
         {
-            // a bit of gymnastics to be able to replace sstables in compaction strategies
-            // we use this to know that a compaction finished and where to start the next compaction in LCS
-            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+            readLock.unlock();
+        }
+    }
 
-            Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
-            int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+    private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
+    {
+        // a bit of gymnastics to be able to replace sstables in compaction strategies
+        // we use this to know that a compaction finished and where to start the next compaction in LCS
+        Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
+        int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
 
-            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
-            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
-            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
-            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+        List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
 
-            for (int i = 0; i < locationSize; i++)
-            {
-                repairedRemoved.add(new HashSet<>());
-                repairedAdded.add(new HashSet<>());
-                unrepairedRemoved.add(new HashSet<>());
-                unrepairedAdded.add(new HashSet<>());
-            }
-
-            for (SSTableReader sstable : listChangedNotification.removed)
-            {
-                int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-                if (sstable.isRepaired())
-                    repairedRemoved.get(i).add(sstable);
-                else
-                    unrepairedRemoved.get(i).add(sstable);
-            }
-            for (SSTableReader sstable : listChangedNotification.added)
-            {
-                int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-                if (sstable.isRepaired())
-                    repairedAdded.get(i).add(sstable);
-                else
-                    unrepairedAdded.get(i).add(sstable);
-            }
+        for (int i = 0; i < locationSize; i++)
+        {
+            repairedRemoved.add(new HashSet<>());
+            repairedAdded.add(new HashSet<>());
+            unrepairedRemoved.add(new HashSet<>());
+            unrepairedAdded.add(new HashSet<>());
+        }
 
+        for (SSTableReader sstable : removed)
+        {
+            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            if (sstable.isRepaired())
+                repairedRemoved.get(i).add(sstable);
+            else
+                unrepairedRemoved.get(i).add(sstable);
+        }
+        for (SSTableReader sstable : added)
+        {
+            int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+            if (sstable.isRepaired())
+                repairedAdded.get(i).add(sstable);
+            else
+                unrepairedAdded.get(i).add(sstable);
+        }
+        // we need write lock here since we might be moving sstables between strategies
+        writeLock.lock();
+        try
+        {
             for (int i = 0; i < locationSize; i++)
             {
                 if (!repairedRemoved.get(i).isEmpty())
                     repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
                 else
-                {
-                    for (SSTableReader sstable : repairedAdded.get(i))
-                        repaired.get(i).addSSTable(sstable);
-                }
+                    repaired.get(i).addSSTables(repairedAdded.get(i));
+
                 if (!unrepairedRemoved.get(i).isEmpty())
                     unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
                 else
-                {
-                    for (SSTableReader sstable : unrepairedAdded.get(i))
-                        unrepaired.get(i).addSSTable(sstable);
-                }
+                    unrepaired.get(i).addSSTables(unrepairedAdded.get(i));
             }
         }
-        else if (notification instanceof SSTableRepairStatusChanged)
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
+    {
+        // we need a write lock here since we move sstables from one strategy instance to another
+        writeLock.lock();
+        try
         {
-            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+            for (SSTableReader sstable : sstables)
             {
                 int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
@@ -389,31 +477,81 @@ public class CompactionStrategyManager implements INotificationConsumer
                 }
             }
         }
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    private void handleDeletingNotification(SSTableReader deleted)
+    {
+        readLock.lock();
+        try
+        {
+            getCompactionStrategyFor(deleted).removeSSTable(deleted);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+    }
+
+    public void handleNotification(INotification notification, Object sender)
+    {
+        maybeReload(cfs.metadata);
+        if (notification instanceof SSTableAddedNotification)
+        {
+            handleFlushNotification(((SSTableAddedNotification) notification).added);
+        }
+        else if (notification instanceof SSTableListChangedNotification)
+        {
+            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+            handleListChangedNotification(listChangedNotification.added, listChangedNotification.removed);
+        }
+        else if (notification instanceof SSTableRepairStatusChanged)
+        {
+            handleRepairStatusChangedNotification(((SSTableRepairStatusChanged) notification).sstables);
+        }
         else if (notification instanceof SSTableDeletingNotification)
         {
-            SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting;
-            getCompactionStrategyFor(sstable).removeSSTable(sstable);
+            handleDeletingNotification(((SSTableDeletingNotification) notification).deleting);
         }
     }
 
     public void enable()
     {
-        if (repaired != null)
-            repaired.forEach(AbstractCompactionStrategy::enable);
-        if (unrepaired != null)
-            unrepaired.forEach(AbstractCompactionStrategy::enable);
-        // enable this last to make sure the strategies are ready to get calls.
-        enabled = true;
+        writeLock.lock();
+        try
+        {
+            if (repaired != null)
+                repaired.forEach(AbstractCompactionStrategy::enable);
+            if (unrepaired != null)
+                unrepaired.forEach(AbstractCompactionStrategy::enable);
+            // enable this last to make sure the strategies are ready to get calls.
+            enabled = true;
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     public void disable()
     {
-        // disable this first avoid asking disabled strategies for compaction tasks
-        enabled = false;
-        if (repaired != null)
-            repaired.forEach(AbstractCompactionStrategy::disable);
-        if (unrepaired != null)
-            unrepaired.forEach(AbstractCompactionStrategy::disable);
+        writeLock.lock();
+        try
+        {
+            // disable this first avoid asking disabled strategies for compaction tasks
+            enabled = false;
+            if (repaired != null)
+                repaired.forEach(AbstractCompactionStrategy::disable);
+            if (unrepaired != null)
+                unrepaired.forEach(AbstractCompactionStrategy::disable);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     /**
@@ -425,7 +563,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @return
      */
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
+    public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
         assert repaired.size() == unrepaired.size();
         List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
@@ -447,68 +585,92 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 
-        for (Range<Token> range : ranges)
+        readLock.lock();
+        try
         {
-            List<ISSTableScanner> repairedScanners = new ArrayList<>();
-            List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
-
-            for (int i = 0; i < repairedSSTables.size(); i++)
-            {
-                if (!repairedSSTables.get(i).isEmpty())
-                    repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
-            }
-            for (int i = 0; i < unrepairedSSTables.size(); i++)
+            for (Range<Token> range : ranges)
             {
-                if (!unrepairedSSTables.get(i).isEmpty())
-                    scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
-            }
-            for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
-            {
-                if (!scanners.add(scanner))
-                    scanner.close();
+                List<ISSTableScanner> repairedScanners = new ArrayList<>();
+                List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
+
+                for (int i = 0; i < repairedSSTables.size(); i++)
+                {
+                    if (!repairedSSTables.get(i).isEmpty())
+                        repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
+                }
+                for (int i = 0; i < unrepairedSSTables.size(); i++)
+                {
+                    if (!unrepairedSSTables.get(i).isEmpty())
+                        scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
+                }
+                for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
+                {
+                    if (!scanners.add(scanner))
+                        scanner.close();
+                }
             }
+            return new AbstractCompactionStrategy.ScannerList(scanners);
+        }
+        finally
+        {
+            readLock.unlock();
         }
-        return new AbstractCompactionStrategy.ScannerList(scanners);
     }
 
-    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
+    public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
     {
         return getScanners(sstables, Collections.singleton(null));
     }
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
-        Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
-        Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
+        readLock.lock();
+        try
+        {
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+            Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
-        for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
-            anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
-        return anticompactionGroups;
+            for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
+                anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+            return anticompactionGroups;
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public long getMaxSSTableBytes()
     {
-        return unrepaired.get(0).getMaxSSTableBytes();
+        readLock.lock();
+        try
+        {
+            return unrepaired.get(0).getMaxSSTableBytes();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
         maybeReload(cfs.metadata);
-        validateForCompaction(txn.originals());
+        validateForCompaction(txn.originals(), cfs, getDirectories());
         return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
     }
 
-    private void validateForCompaction(Iterable<SSTableReader> input)
+    private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
     {
         SSTableReader firstSSTable = Iterables.getFirst(input, null);
         assert firstSSTable != null;
         boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable);
+        int firstIndex = getCompactionStrategyIndex(cfs, directories, firstSSTable);
         for (SSTableReader sstable : input)
         {
             if (sstable.isRepaired() != repaired)
                 throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable))
+            if (firstIndex != getCompactionStrategyIndex(cfs, directories, sstable))
                 throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
         }
     }
@@ -524,9 +686,10 @@ public class CompactionStrategyManager implements INotificationConsumer
             @Override
             public Collection<AbstractCompactionTask> call() throws Exception
             {
-                synchronized (CompactionStrategyManager.this)
+                List<AbstractCompactionTask> tasks = new ArrayList<>();
+                readLock.lock();
+                try
                 {
-                    List<AbstractCompactionTask> tasks = new ArrayList<>();
                     for (AbstractCompactionStrategy strategy : repaired)
                     {
                         Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
@@ -539,10 +702,14 @@ public class CompactionStrategyManager implements INotificationConsumer
                         if (task != null)
                             tasks.addAll(task);
                     }
-                    if (tasks.isEmpty())
-                        return null;
-                    return tasks;
                 }
+                finally
+                {
+                    readLock.unlock();
+                }
+                if (tasks.isEmpty())
+                    return null;
+                return tasks;
             }
         }, false, false);
     }
@@ -550,18 +717,34 @@ public class CompactionStrategyManager implements INotificationConsumer
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
     {
         maybeReload(cfs.metadata);
-        validateForCompaction(sstables);
-        return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+        validateForCompaction(sstables, cfs, getDirectories());
+        readLock.lock();
+        try
+        {
+            return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
     public int getEstimatedRemainingTasks()
     {
         int tasks = 0;
-        for (AbstractCompactionStrategy strategy : repaired)
-            tasks += strategy.getEstimatedRemainingTasks();
-        for (AbstractCompactionStrategy strategy : unrepaired)
-            tasks += strategy.getEstimatedRemainingTasks();
+        readLock.lock();
+        try
+        {
 
+            for (AbstractCompactionStrategy strategy : repaired)
+                tasks += strategy.getEstimatedRemainingTasks();
+            for (AbstractCompactionStrategy strategy : unrepaired)
+                tasks += strategy.getEstimatedRemainingTasks();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
         return tasks;
     }
 
@@ -572,23 +755,40 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public String getName()
     {
-        return unrepaired.get(0).getName();
+        readLock.lock();
+        try
+        {
+            return unrepaired.get(0).getName();
+        }
+        finally
+        {
+            readLock.unlock();
+        }
     }
 
-    public List<List<AbstractCompactionStrategy>> getStrategies()
+    @VisibleForTesting
+    List<List<AbstractCompactionStrategy>> getStrategies()
     {
         return Arrays.asList(repaired, unrepaired);
     }
 
-    public synchronized void setNewLocalCompactionStrategy(CompactionParams params)
+    public void setNewLocalCompactionStrategy(CompactionParams params)
     {
         logger.info("Switching local compaction strategy from {} to {}}", this.params, params);
-        setStrategy(params);
-        if (shouldBeEnabled())
-            enable();
-        else
-            disable();
-        startup();
+        writeLock.lock();
+        try
+        {
+            setStrategy(params);
+            if (shouldBeEnabled())
+                enable();
+            else
+                disable();
+            startup();
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
     }
 
     private void setStrategy(CompactionParams params)
@@ -633,13 +833,21 @@ public class CompactionStrategyManager implements INotificationConsumer
                                                        Collection<Index> indexes,
                                                        LifecycleTransaction txn)
     {
-        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+        readLock.lock();
+        try
         {
-            return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+            {
+                return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            }
+            else
+            {
+                return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            }
         }
-        else
+        finally
         {
-            return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            readLock.unlock();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 248473c..9a17e06 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -63,7 +63,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
 
     @Override
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         while (true)
         {
@@ -83,7 +83,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      * @param gcBefore
      * @return
      */
-    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
         if (sstables.isEmpty())
             return Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 953971a..84ddbbc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -90,7 +90,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
      * (by explicit user request) even when compaction is disabled.
      */
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         while (true)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index e36adf2..28bdf5c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -75,7 +75,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options);
     }
 
-    private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
+    private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
         // make local copies so they can't be changed out from under us mid-method
         int minThreshold = cfs.getMinimumCompactionThreshold();
@@ -175,7 +175,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
     }
 
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         while (true)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
index d1398bc..8c48fa8 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableRepairStatusChanged.java
@@ -24,10 +24,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableRepairStatusChanged implements INotification
 {
-    public final Collection<SSTableReader> sstable;
+    public final Collection<SSTableReader> sstables;
 
     public SSTableRepairStatusChanged(Collection<SSTableReader> repairStatusChanged)
     {
-        this.sstable = repairStatusChanged;
+        this.sstables = repairStatusChanged;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index 4249430..42772ef 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -161,7 +161,7 @@ public class StandaloneScrubber
     private static void checkManifest(CompactionStrategyManager strategyManager, ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
     {
         int maxSizeInMB = (int)((cfs.getCompactionStrategyManager().getMaxSSTableBytes()) / (1024L * 1024L));
-        if (strategyManager.getStrategies().size() == 2 && strategyManager.getStrategies().get(0) instanceof LeveledCompactionStrategy)
+        if (strategyManager.getCompactionParams().klass().equals(LeveledCompactionStrategy.class))
         {
             System.out.println("Checking leveled manifest");
             Predicate<SSTableReader> repairedPredicate = new Predicate<SSTableReader>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed0a07c3/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index b3dc3d9..0294115 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -349,7 +349,7 @@ public class TrackerTest
         Assert.assertEquals(singleton(r2), ((SSTableListChangedNotification) listener.received.get(0)).added);
         listener.received.clear();
         tracker.notifySSTableRepairedStatusChanged(singleton(r1));
-        Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstable);
+        Assert.assertEquals(singleton(r1), ((SSTableRepairStatusChanged) listener.received.get(0)).sstables);
         listener.received.clear();
         Memtable memtable = MockSchema.memtable(cfs);
         tracker.notifyRenewed(memtable);