You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/12/08 18:34:44 UTC

[1/3] cassandra git commit: Reload compaction strategies when disk boundaries are invalidated

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 b637eb11c -> 25e46f052
  refs/heads/trunk 9110c08f4 -> 29a0d1f82


Reload compaction strategies when disk boundaries are invalidated

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25e46f05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25e46f05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25e46f05

Branch: refs/heads/cassandra-3.11
Commit: 25e46f05294fd42c111f2f1d5881082d97c572ea
Parents: b637eb1
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Nov 30 23:14:34 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:30:01 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/Directories.java    |   7 +
 .../org/apache/cassandra/db/DiskBoundaries.java |  62 ++-
 .../cassandra/db/DiskBoundaryManager.java       |  32 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../compaction/CompactionStrategyManager.java   | 406 ++++++++++++-------
 .../cassandra/db/compaction/Scrubber.java       |   4 +-
 .../SizeTieredCompactionStrategy.java           |   3 +-
 .../cassandra/service/CassandraDaemon.java      |  32 +-
 .../cassandra/service/StorageService.java       |  17 +
 .../CompactionStrategyManagerTest.java          | 290 +++++++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +-
 12 files changed, 676 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 18a22bd..b7a6e14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
  * Remove OpenJDK log warning (CASSANDRA-13916)
  * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
  * Cache disk boundaries (CASSANDRA-13215)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 5e52b0f..532bf98 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -579,6 +579,13 @@ public class Directories
         {
             return location.hashCode();
         }
+
+        public String toString()
+        {
+            return "DataDirectory{" +
+                   "location=" + location +
+                   '}';
+        }
     }
 
     static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093..7bfed28 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -18,18 +18,30 @@
 
 package org.apache.cassandra.db;
 
+import java.util.Collections;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+
 public class DiskBoundaries
 {
     public final List<Directories.DataDirectory> directories;
     public final ImmutableList<PartitionPosition> positions;
     final long ringVersion;
     final int directoriesVersion;
+    private volatile boolean isInvalid = false;
 
-    DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+    public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion)
+    {
+        this(directories, null, -1, diskVersion);
+    }
+
+    @VisibleForTesting
+    public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
     {
         this.directories = directories == null ? null : ImmutableList.copyOf(directories);
         this.positions = positions == null ? null : ImmutableList.copyOf(positions);
@@ -68,4 +80,52 @@ public class DiskBoundaries
                ", directoriesVersion=" + directoriesVersion +
                '}';
     }
+
+    /**
+     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+     */
+    public boolean isOutOfDate()
+    {
+        if (isInvalid)
+            return true;
+        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+        return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+    }
+
+    public void invalidate()
+    {
+        this.isInvalid = true;
+    }
+
+    public int getDiskIndex(SSTableReader sstable)
+    {
+        if (positions == null)
+        {
+            return getBoundariesFromSSTableDirectory(sstable);
+        }
+
+        int pos = Collections.binarySearch(positions, sstable.first);
+        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+        return -pos - 1;
+    }
+
+    /**
+     * Try to figure out location based on sstable directory
+     */
+    private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
+    {
+        for (int i = 0; i < directories.size(); i++)
+        {
+            Directories.DataDirectory directory = directories.get(i);
+            if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                return i;
+        }
+        return 0;
+    }
+
+    public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
+    {
+        return directories.get(getDiskIndex(sstable));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 7872554..14d3983 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -42,38 +42,27 @@ public class DiskBoundaryManager
     public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
     {
         if (!cfs.getPartitioner().splitter().isPresent())
-            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
-        // copy the reference to avoid getting nulled out by invalidate() below
-        // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
-        // returning null would be bad
-        DiskBoundaries db = diskBoundaries;
-        if (isOutOfDate(diskBoundaries))
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
+        if (diskBoundaries == null || diskBoundaries.isOutOfDate())
         {
             synchronized (this)
             {
-                db = diskBoundaries;
-                if (isOutOfDate(diskBoundaries))
+                if (diskBoundaries == null || diskBoundaries.isOutOfDate())
                 {
                     logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
                     DiskBoundaries oldBoundaries = diskBoundaries;
-                    db = diskBoundaries = getDiskBoundaryValue(cfs);
+                    diskBoundaries = getDiskBoundaryValue(cfs);
                     logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
                 }
             }
         }
-        return db;
+        return diskBoundaries;
     }
 
-    /**
-     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
-     */
-    private boolean isOutOfDate(DiskBoundaries db)
+    public void invalidate()
     {
-        if (db == null)
-            return true;
-        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
-        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
-        return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+       if (diskBoundaries != null)
+           diskBoundaries.invalidate();
     }
 
     private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
@@ -145,9 +134,4 @@ public class DiskBoundaryManager
         diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
         return diskBoundaries;
     }
-
-    public void invalidate()
-    {
-        diskBoundaries = null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3351736..4030384 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -525,7 +525,7 @@ public class CompactionManager implements CompactionManagerMBean
                 transaction.cancel(Sets.difference(originals, needsRelocation));
 
                 Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
-                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
+                                                                                                                         cfs.getCompactionStrategyManager().getCompactionStrategyIndex(s)));
 
                 int maxSize = 0;
                 for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -545,7 +545,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 if (!cfs.getPartitioner().splitter().isPresent())
                     return true;
-                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+                int directoryIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
                 Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 
                 Directories.DataDirectory location = locations[directoryIndex];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 6305096..4103433 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
 
@@ -36,7 +39,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -56,45 +58,73 @@ import org.apache.cassandra.service.ActiveRepairService;
  *
  * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
  * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ *
+ * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+ * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+ * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+ * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+ * {@link this#writeLock}.
+ *
+ * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+ * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+ * before acquiring the read lock to acess the strategies.
+ *
  */
-
 public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
     public final CompactionLogger compactionLogger;
     private final ColumnFamilyStore cfs;
+    private final boolean partitionSSTablesByTokenRange;
+    private final Supplier<DiskBoundaries> boundariesSupplier;
+
+    /**
+     * Performs mutual exclusion on the variables below
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+    /**
+     * Variables guarded by read and write lock above
+     */
+    //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
     private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
     private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+    private volatile CompactionParams params;
+    private DiskBoundaries currentBoundaries;
     private volatile boolean enabled = true;
     private volatile boolean isActive = true;
-    private volatile CompactionParams params;
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
-    /*
+    /**
         We keep a copy of the schema compaction parameters here to be able to decide if we
-        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+        should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
 
         If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
         we will use the new compaction parameters.
-     */
+     **/
     private volatile CompactionParams schemaCompactionParams;
-    private Directories.DataDirectory[] locations;
     private boolean shouldDefragment;
     private int fanout;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
+        this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+    }
+
+    @VisibleForTesting
+    public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+                                     boolean partitionSSTablesByTokenRange)
+    {
         cfs.getTracker().subscribe(this);
         logger.trace("{} subscribed to the data tracker.", this);
         this.cfs = cfs;
         this.compactionLogger = new CompactionLogger(cfs, this);
-        reload(cfs.metadata);
+        this.boundariesSupplier = boundariesSupplier;
+        this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
         params = cfs.metadata.params.compaction;
-        locations = getDirectories().getWriteableLocations();
         enabled = params.isEnabled();
-
+        reload(cfs.metadata.params.compaction);
     }
 
     /**
@@ -105,13 +135,13 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
             if (!isEnabled())
                 return null;
 
-            maybeReload(cfs.metadata);
             List<AbstractCompactionStrategy> strategies = new ArrayList<>();
 
             strategies.addAll(repaired);
@@ -181,7 +211,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
             {
                 if (sstable.openReason != SSTableReader.OpenReason.EARLY)
-                    getCompactionStrategyFor(sstable).addSSTable(sstable);
+                    compactionStrategyFor(sstable).addSSTable(sstable);
             }
             repaired.forEach(AbstractCompactionStrategy::startup);
             unrepaired.forEach(AbstractCompactionStrategy::startup);
@@ -205,12 +235,20 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @param sstable
      * @return
      */
-    public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    {
+        maybeReloadDiskBoundaries();
+        return compactionStrategyFor(sstable);
+    }
+
+    @VisibleForTesting
+    protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
     {
-        int index = getCompactionStrategyIndex(cfs, sstable);
+        // should not call maybeReloadDiskBoundaries because it may be called from within lock
         readLock.lock();
         try
         {
+            int index = compactionStrategyIndexFor(sstable);
             if (sstable.isRepaired())
                 return repaired.get(index);
             else
@@ -230,33 +268,33 @@ public class CompactionStrategyManager implements INotificationConsumer
      * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
      * sstables in the correct locations and give them to the correct compaction strategy instance.
      *
-     * @param cfs
      * @param sstable
      * @return
      */
-    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+    public int getCompactionStrategyIndex(SSTableReader sstable)
     {
-        if (!cfs.getPartitioner().splitter().isPresent())
-            return 0;
+        maybeReloadDiskBoundaries();
+        return compactionStrategyIndexFor(sstable);
+    }
 
-        DiskBoundaries boundaries = cfs.getDiskBoundaries();
-        List<Directories.DataDirectory> directories = boundaries.directories;
+    @VisibleForTesting
+    protected int compactionStrategyIndexFor(SSTableReader sstable)
+    {
+        // should not call maybeReload because it may be called from within lock
+        readLock.lock();
+        try
+        {
+            //We only have a single compaction strategy when sstables are not
+            //partitioned by token range
+            if (!partitionSSTablesByTokenRange)
+                return 0;
 
-        if (boundaries.positions == null)
+            return currentBoundaries.getDiskIndex(sstable);
+        }
+        finally
         {
-            // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.size(); i++)
-            {
-                Directories.DataDirectory directory = directories.get(i);
-                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
-                    return i;
-            }
-            return 0;
+            readLock.unlock();
         }
-
-        int pos = Collections.binarySearch(boundaries.positions, sstable.first);
-        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
-        return -pos - 1;
     }
 
     public void shutdown()
@@ -278,14 +316,48 @@ public class CompactionStrategyManager implements INotificationConsumer
     public void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
-        if (metadata.params.compaction.equals(schemaCompactionParams) &&
-            Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+        if (metadata.params.compaction.equals(schemaCompactionParams))
             return;
 
         writeLock.lock();
         try
         {
-            reload(metadata);
+            // compare the old schema configuration to the new one, ignore any locally set changes.
+            if (metadata.params.compaction.equals(schemaCompactionParams))
+                return;
+            reload(metadata.params.compaction);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Checks if the disk boundaries changed and reloads the compaction strategies
+     * to reflect the most up-to-date disk boundaries.
+     *
+     * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
+     * disk locations and boundaries are used.
+     *
+     * This should *never* be called inside by a thread holding the {@link this#readLock}, since it
+     * will potentially acquire the {@link this#writeLock} to update the compaction strategies
+     * what can cause a deadlock.
+     */
+    //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
+    @VisibleForTesting
+    protected boolean maybeReloadDiskBoundaries()
+    {
+        if (!currentBoundaries.isOutOfDate())
+            return false;
+
+        writeLock.lock();
+        try
+        {
+            if (!currentBoundaries.isOutOfDate())
+                return false;
+            reload(params);
+            return true;
         }
         finally
         {
@@ -297,20 +369,28 @@ public class CompactionStrategyManager implements INotificationConsumer
      * Reload the compaction strategies
      *
      * Called after changing configuration and at startup.
-     * @param metadata
+     * @param newCompactionParams
      */
-    private void reload(CFMetaData metadata)
+    private void reload(CompactionParams newCompactionParams)
     {
+        boolean enabledWithJMX = enabled && !shouldBeEnabled();
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
-        if (!metadata.params.compaction.equals(schemaCompactionParams))
-            logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
-        else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
-            logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
 
-        setStrategy(metadata.params.compaction);
-        schemaCompactionParams = metadata.params.compaction;
+        if (currentBoundaries != null)
+        {
+            if (!newCompactionParams.equals(schemaCompactionParams))
+                logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+            else if (currentBoundaries.isOutOfDate())
+                logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+        }
+
+        if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+            currentBoundaries = boundariesSupplier.get();
+
+        setStrategy(newCompactionParams);
+        schemaCompactionParams = cfs.metadata.params.compaction;
 
-        if (disabledWithJMX || !shouldBeEnabled())
+        if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
             disable();
         else
             enable();
@@ -326,6 +406,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int getUnleveledSSTables()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -353,6 +434,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int[] getSSTableCountPerLevel()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -401,6 +483,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Directories getDirectories()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -415,11 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void handleFlushNotification(Iterable<SSTableReader> added)
     {
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
+
         readLock.lock();
         try
         {
             for (SSTableReader sstable : added)
-                getCompactionStrategyFor(sstable).addSSTable(sstable);
+                compactionStrategyFor(sstable).addSSTable(sstable);
         }
         finally
         {
@@ -429,44 +517,47 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
     {
-        // a bit of gymnastics to be able to replace sstables in compaction strategies
-        // we use this to know that a compaction finished and where to start the next compaction in LCS
-        Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
-        int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
-
-        List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
-
-        for (int i = 0; i < locationSize; i++)
-        {
-            repairedRemoved.add(new HashSet<>());
-            repairedAdded.add(new HashSet<>());
-            unrepairedRemoved.add(new HashSet<>());
-            unrepairedAdded.add(new HashSet<>());
-        }
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
 
-        for (SSTableReader sstable : removed)
-        {
-            int i = getCompactionStrategyIndex(cfs, sstable);
-            if (sstable.isRepaired())
-                repairedRemoved.get(i).add(sstable);
-            else
-                unrepairedRemoved.get(i).add(sstable);
-        }
-        for (SSTableReader sstable : added)
-        {
-            int i = getCompactionStrategyIndex(cfs, sstable);
-            if (sstable.isRepaired())
-                repairedAdded.get(i).add(sstable);
-            else
-                unrepairedAdded.get(i).add(sstable);
-        }
-        // we need write lock here since we might be moving sstables between strategies
-        writeLock.lock();
+        readLock.lock();
         try
         {
+            // a bit of gymnastics to be able to replace sstables in compaction strategies
+            // we use this to know that a compaction finished and where to start the next compaction in LCS
+            int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
+
+            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+            for (int i = 0; i < locationSize; i++)
+            {
+                repairedRemoved.add(new HashSet<>());
+                repairedAdded.add(new HashSet<>());
+                unrepairedRemoved.add(new HashSet<>());
+                unrepairedAdded.add(new HashSet<>());
+            }
+
+            for (SSTableReader sstable : removed)
+            {
+                int i = compactionStrategyIndexFor(sstable);
+                if (sstable.isRepaired())
+                    repairedRemoved.get(i).add(sstable);
+                else
+                    unrepairedRemoved.get(i).add(sstable);
+            }
+            for (SSTableReader sstable : added)
+            {
+                int i = compactionStrategyIndexFor(sstable);
+                if (sstable.isRepaired())
+                    repairedAdded.get(i).add(sstable);
+                else
+                    unrepairedAdded.get(i).add(sstable);
+            }
             for (int i = 0; i < locationSize; i++)
             {
                 if (!repairedRemoved.get(i).isEmpty())
@@ -482,19 +573,23 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
     {
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
         // we need a write lock here since we move sstables from one strategy instance to another
-        writeLock.lock();
+        readLock.lock();
         try
         {
             for (SSTableReader sstable : sstables)
             {
-                int index = getCompactionStrategyIndex(cfs, sstable);
+                int index = compactionStrategyIndexFor(sstable);
                 if (sstable.isRepaired())
                 {
                     unrepaired.get(index).removeSSTable(sstable);
@@ -509,26 +604,29 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     private void handleDeletingNotification(SSTableReader deleted)
     {
-        writeLock.lock();
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
+        readLock.lock();
         try
         {
-            getCompactionStrategyFor(deleted).removeSSTable(deleted);
+            compactionStrategyFor(deleted).removeSSTable(deleted);
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     public void handleNotification(INotification notification, Object sender)
     {
-        maybeReload(cfs.metadata);
         if (notification instanceof SSTableAddedNotification)
         {
             handleFlushNotification(((SSTableAddedNotification) notification).added);
@@ -595,29 +693,29 @@ public class CompactionStrategyManager implements INotificationConsumer
     @SuppressWarnings("resource")
     public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
-        assert repaired.size() == unrepaired.size();
-        List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
-        List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
-
-        for (int i = 0; i < repaired.size(); i++)
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
         {
-            repairedSSTables.add(new HashSet<>());
-            unrepairedSSTables.add(new HashSet<>());
-        }
+            assert repaired.size() == unrepaired.size();
+            List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+            List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
 
-        for (SSTableReader sstable : sstables)
-        {
-            if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
-            else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
-        }
+            for (int i = 0; i < repaired.size(); i++)
+            {
+                repairedSSTables.add(new HashSet<>());
+                unrepairedSSTables.add(new HashSet<>());
+            }
 
-        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+            for (SSTableReader sstable : sstables)
+            {
+                if (sstable.isRepaired())
+                    repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+                else
+                    unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+            }
 
-        readLock.lock();
-        try
-        {
+            List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
             for (int i = 0; i < repairedSSTables.size(); i++)
             {
                 if (!repairedSSTables.get(i).isEmpty())
@@ -644,10 +742,11 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
-            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
             Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
             for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -675,29 +774,47 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
-        maybeReload(cfs.metadata);
-        validateForCompaction(txn.originals(), cfs, getDirectories());
-        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
+        {
+            validateForCompaction(txn.originals());
+            return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+
     }
 
-    private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+    private void validateForCompaction(Iterable<SSTableReader> input)
     {
-        SSTableReader firstSSTable = Iterables.getFirst(input, null);
-        assert firstSSTable != null;
-        boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
-        for (SSTableReader sstable : input)
+        readLock.lock();
+        try
+        {
+            SSTableReader firstSSTable = Iterables.getFirst(input, null);
+            assert firstSSTable != null;
+            boolean repaired = firstSSTable.isRepaired();
+            int firstIndex = compactionStrategyIndexFor(firstSSTable);
+            for (SSTableReader sstable : input)
+            {
+                if (sstable.isRepaired() != repaired)
+                    throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+                if (firstIndex != compactionStrategyIndexFor(sstable))
+                    throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            }
+        }
+        finally
         {
-            if (sstable.isRepaired() != repaired)
-                throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
-                throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            readLock.unlock();
         }
+
     }
 
     public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
     {
-        maybeReload(cfs.metadata);
+        maybeReloadDiskBoundaries();
         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
         // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
         // sstables are marked the compactions are re-enabled
@@ -745,18 +862,18 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
     {
-        maybeReload(cfs.metadata);
+        maybeReloadDiskBoundaries();
         List<AbstractCompactionTask> ret = new ArrayList<>();
         readLock.lock();
         try
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
                                                                          .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                         .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
                                                                            .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
-                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                           .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
@@ -773,20 +890,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
-    /**
-     * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
-     */
-    @Deprecated()
-    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
-    {
-        validateForCompaction(sstables, cfs, getDirectories());
-        List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
-        assert tasks.size() == 1;
-        return tasks.get(0);
-    }
-
     public int getEstimatedRemainingTasks()
     {
+        maybeReloadDiskBoundaries();
         int tasks = 0;
         readLock.lock();
         try
@@ -811,6 +917,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public String getName()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -824,6 +931,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public List<List<AbstractCompactionStrategy>> getStrategies()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -861,10 +969,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         repaired.clear();
         unrepaired.clear();
 
-        if (cfs.getPartitioner().splitter().isPresent())
+        if (partitionSSTablesByTokenRange)
         {
-            locations = cfs.getDirectories().getWriteableLocations();
-            for (int i = 0; i < locations.length; i++)
+            for (int i = 0; i < currentBoundaries.directories.size(); i++)
             {
                 repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
                 unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
@@ -896,6 +1003,7 @@ public class CompactionStrategyManager implements INotificationConsumer
                                                        Collection<Index> indexes,
                                                        LifecycleTransaction txn)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -921,21 +1029,21 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
     {
-        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        if (cfs.getPartitioner().splitter().isPresent())
+        List<Directories.DataDirectory> locations = currentBoundaries.directories;
+        if (partitionSSTablesByTokenRange)
         {
             int unrepairedIndex = unrepaired.indexOf(strategy);
             if (unrepairedIndex > 0)
             {
-                return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+                return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath());
             }
             int repairedIndex = repaired.indexOf(strategy);
             if (repairedIndex > 0)
             {
-                return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+                return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath());
             }
         }
-        List<String> folders = new ArrayList<>(locations.length);
+        List<String> folders = new ArrayList<>(locations.size());
         for (Directories.DataDirectory location : locations)
         {
             folders.add(location.location.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b1f2e9f..4635824 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+        int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
         this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 
@@ -508,7 +508,7 @@ public class Scrubber implements Closeable
                         nextToOffer = peek; // Offer peek in next call
                         return next;
                     }
-    
+
                     // Duplicate row, merge it.
                     next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0dd134a..96b733e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -65,7 +65,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
     protected volatile int estimatedRemainingTasks;
-    private final Set<SSTableReader> sstables = new HashSet<>();
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
 
     public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 820b016..3dbf3d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -288,7 +288,7 @@ public class CassandraDaemon
         {
             if (logger.isDebugEnabled())
                 logger.debug("opening keyspace {}", keyspaceName);
-            // disable auto compaction until commit log replay ends
+            // disable auto compaction until gossip settles since disk boundaries may be affected by ring layout
             for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
             {
                 for (ColumnFamilyStore store : cfs.concatWithIndexes())
@@ -298,7 +298,6 @@ public class CassandraDaemon
             }
         }
 
-
         try
         {
             loadRowAndKeyCacheAsync().get();
@@ -338,19 +337,6 @@ public class CassandraDaemon
         // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
         LegacyBatchlogMigrator.migrate();
 
-        // enable auto compaction
-        for (Keyspace keyspace : Keyspace.all())
-        {
-            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-            {
-                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
-                {
-                    if (store.getCompactionStrategyManager().shouldBeEnabled())
-                        store.enableAutoCompaction();
-                }
-            }
-        }
-
         SystemKeyspace.finishStartup();
 
         // Prepared statements
@@ -413,6 +399,22 @@ public class CassandraDaemon
         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
             Gossiper.waitToSettle();
 
+        // re-enable auto-compaction after gossip is settled, so correct disk boundaries are used
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.reload(); //reload CFs in case there was a change of disk boundaries
+                    if (store.getCompactionStrategyManager().shouldBeEnabled())
+                    {
+                        store.enableAutoCompaction();
+                    }
+                }
+            }
+        }
+
         // schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
         // due to scheduling errors or race conditions
         ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e93430b..fafe8e8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1496,6 +1496,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             SystemKeyspace.resetAvailableRanges();
         }
 
+        // Force disk boundary invalidation now that local tokens are set
+        invalidateDiskBoundaries();
+
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
         BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
         bootstrapper.addProgressListener(progressSupport);
@@ -1527,6 +1530,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    private void invalidateDiskBoundaries()
+    {
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.invalidateDiskBoundaries();
+                }
+            }
+        }
+    }
+
     /**
      * All MVs have been created during bootstrap, so mark them as built
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
new file mode 100644
index 0000000..c654fcd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.DiskBoundaryManager;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerTest
+{
+    private static final String KS_PREFIX = "Keyspace1";
+    private static final String TABLE_PREFIX = "CF_STANDARD";
+
+    private static IPartitioner originalPartitioner;
+    private static boolean backups;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        SchemaLoader.prepareServer();
+        backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+        /**
+         * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+         * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+         */
+        originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+    }
+
+    @Test
+    public void testSSTablesAssignedToCorrectCompactionStrategy()
+    {
+        // Creates 100 SSTables with keys 0-99
+        int numSSTables = 100;
+        SchemaLoader.createKeyspace(KS_PREFIX,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+                                                .compaction(CompactionParams.scts(Collections.emptyMap())));
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        cfs.disableAutoCompaction();
+        for (int i = 0; i < numSSTables; i++)
+        {
+            createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
+        }
+
+        // Creates a CompactionStrategymanager with different numbers of disks and check
+        // if the SSTables are assigned to the correct compaction strategies
+        for (int numDisks = 2; numDisks < 10; numDisks++)
+        {
+            testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+        }
+    }
+
+    public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+    {
+        // Create a mock CFS with the given number of disks
+        MockCFS cfs = createJBODMockCFS(numDisks);
+        //Check that CFS will contain numSSTables
+        assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+        // Creates a compaction strategy manager with an external boundary supplier
+        final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+
+        MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+        System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+        CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+                                                                      true);
+
+        // Check that SSTables are assigned to the correct Compaction Strategy
+        for (SSTableReader reader : cfs.getLiveSSTables())
+        {
+            verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+        }
+
+        for (int delta = 1; delta <= 3; delta++)
+        {
+            // Update disk boundaries
+            Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+            updateBoundaries(mockBoundaryManager, boundaries, delta);
+
+            // Check that SSTables are still assigned to the previous boundary layout
+            System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+            for (SSTableReader reader : cfs.getLiveSSTables())
+            {
+                verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+            }
+
+            // Reload CompactionStrategyManager so new disk boundaries will be loaded
+            csm.maybeReloadDiskBoundaries();
+
+            for (SSTableReader reader : cfs.getLiveSSTables())
+            {
+                // Check that SSTables are assigned to the new boundary layout
+                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+
+                // Remove SSTable and check that it will be removed from the correct compaction strategy
+                csm.handleNotification(new SSTableDeletingNotification(reader), this);
+                assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+
+                // Add SSTable again and check that is correctly assigned
+                csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
+                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+            }
+        }
+    }
+
+    private MockCFS createJBODMockCFS(int disks)
+    {
+        // Create #disks data directories to simulate JBOD
+        Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+        for (int i = 0; i < disks; ++i)
+        {
+            File tempDir = Files.createTempDir();
+            tempDir.deleteOnExit();
+            directories[i] = new Directories.DataDirectory(tempDir);
+        }
+
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
+        mockCFS.disableAutoCompaction();
+        mockCFS.addSSTables(cfs.getLiveSSTables());
+        return mockCFS;
+    }
+
+    /**
+     * Updates the boundaries with a delta
+     */
+    private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+    {
+        for (int j = 0; j < boundaries.length - 1; j++)
+        {
+            if ((j + delta) % 2 == 0)
+                boundaries[j] -= delta;
+            else
+                boundaries[j] += delta;
+        }
+        boundaryManager.invalidateBoundaries();
+    }
+
+    private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+    {
+        // Check that sstable is assigned to correct disk
+        int index = getSSTableIndex(boundaries, reader);
+        assertEquals(index, csm.compactionStrategyIndexFor(reader));
+        // Check that compaction strategy actually contains SSTable
+        assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+    }
+
+    /**
+     * Creates disk boundaries such that each disk receives
+     * an equal amount of SSTables
+     */
+    private Integer[] computeBoundaries(int numSSTables, int numDisks)
+    {
+        Integer[] result = new Integer[numDisks];
+        int sstablesPerRange = numSSTables / numDisks;
+        result[0] = sstablesPerRange;
+        for (int i = 1; i < numDisks; i++)
+        {
+            result[i] = result[i - 1] + sstablesPerRange;
+        }
+        result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+        return result;
+    }
+
+    /**
+     * Since each SSTable contains keys from 0-99, and each sstable
+     * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+     * we can compute the sstable position in the disk boundaries by finding
+     * the generation position relative to the boundaries
+     */
+    private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+    {
+        int index = 0;
+        while (boundaries[index] < reader.descriptor.generation)
+            index++;
+        System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+        return index;
+    }
+
+
+
+    class MockBoundaryManager
+    {
+        private final ColumnFamilyStore cfs;
+        private Integer[] positions;
+        private DiskBoundaries boundaries;
+
+        public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+        {
+            this.cfs = cfs;
+            this.positions = positions;
+            this.boundaries = createDiskBoundaries(cfs, positions);
+        }
+
+        public void invalidateBoundaries()
+        {
+            boundaries.invalidate();
+        }
+
+        public DiskBoundaries getBoundaries()
+        {
+            if (boundaries.isOutOfDate())
+                boundaries = createDiskBoundaries(cfs, positions);
+            return boundaries;
+        }
+
+        private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+        {
+            List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+        }
+    }
+
+    private static void createSSTableWithKey(String keyspace, String table, int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey dk = Util.dk(String.format("%04d", key));
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+        .clustering(Integer.toString(key))
+        .add("val", "val")
+        .build()
+        .applyUnsafe();
+        cfs.forceBlockingFlush();
+    }
+
+    // just to be able to override the data directories
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 1335906..7873ac9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -104,6 +104,12 @@ public class CompactionsCQLTest extends CQLTester
         assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
         getCurrentColumnFamilyStore().enableAutoCompaction();
         assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+        // Alter keyspace replication settings to force compaction strategy reload and check strategy is still enabled
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        getCurrentColumnFamilyStore().getCompactionStrategyManager().maybeReloadDiskBoundaries();
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
         execute("insert into %s (id) values ('1')");
         flush();
         execute("insert into %s (id) values ('1')");
@@ -161,17 +167,22 @@ public class CompactionsCQLTest extends CQLTester
         localOptions.put("class", "DateTieredCompactionStrategy");
         getCurrentColumnFamilyStore().setCompactionParameters(localOptions);
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+        // Invalidate disk boundaries to ensure that boundary invalidation will not cause the old strategy to be reloaded
+        getCurrentColumnFamilyStore().invalidateDiskBoundaries();
         // altering something non-compaction related
         execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
         // should keep the local compaction strat
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+        // Alter keyspace replication settings to force compaction strategy reload
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        // should keep the local compaction strat
+        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
         // altering a compaction option
         execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
         // will use the new option
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
     }
 
-
     @Test
     public void testSetLocalCompactionStrategyDisable() throws Throwable
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Reload compaction strategies when disk boundaries are invalidated

Posted by pa...@apache.org.
Reload compaction strategies when disk boundaries are invalidated

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25e46f05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25e46f05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25e46f05

Branch: refs/heads/trunk
Commit: 25e46f05294fd42c111f2f1d5881082d97c572ea
Parents: b637eb1
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Nov 30 23:14:34 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:30:01 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/Directories.java    |   7 +
 .../org/apache/cassandra/db/DiskBoundaries.java |  62 ++-
 .../cassandra/db/DiskBoundaryManager.java       |  32 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../compaction/CompactionStrategyManager.java   | 406 ++++++++++++-------
 .../cassandra/db/compaction/Scrubber.java       |   4 +-
 .../SizeTieredCompactionStrategy.java           |   3 +-
 .../cassandra/service/CassandraDaemon.java      |  32 +-
 .../cassandra/service/StorageService.java       |  17 +
 .../CompactionStrategyManagerTest.java          | 290 +++++++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +-
 12 files changed, 676 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 18a22bd..b7a6e14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
  * Remove OpenJDK log warning (CASSANDRA-13916)
  * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
  * Cache disk boundaries (CASSANDRA-13215)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 5e52b0f..532bf98 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -579,6 +579,13 @@ public class Directories
         {
             return location.hashCode();
         }
+
+        public String toString()
+        {
+            return "DataDirectory{" +
+                   "location=" + location +
+                   '}';
+        }
     }
 
     static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093..7bfed28 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -18,18 +18,30 @@
 
 package org.apache.cassandra.db;
 
+import java.util.Collections;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+
 public class DiskBoundaries
 {
     public final List<Directories.DataDirectory> directories;
     public final ImmutableList<PartitionPosition> positions;
     final long ringVersion;
     final int directoriesVersion;
+    private volatile boolean isInvalid = false;
 
-    DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+    public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion)
+    {
+        this(directories, null, -1, diskVersion);
+    }
+
+    @VisibleForTesting
+    public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
     {
         this.directories = directories == null ? null : ImmutableList.copyOf(directories);
         this.positions = positions == null ? null : ImmutableList.copyOf(positions);
@@ -68,4 +80,52 @@ public class DiskBoundaries
                ", directoriesVersion=" + directoriesVersion +
                '}';
     }
+
+    /**
+     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+     */
+    public boolean isOutOfDate()
+    {
+        if (isInvalid)
+            return true;
+        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+        return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+    }
+
+    public void invalidate()
+    {
+        this.isInvalid = true;
+    }
+
+    public int getDiskIndex(SSTableReader sstable)
+    {
+        if (positions == null)
+        {
+            return getBoundariesFromSSTableDirectory(sstable);
+        }
+
+        int pos = Collections.binarySearch(positions, sstable.first);
+        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+        return -pos - 1;
+    }
+
+    /**
+     * Try to figure out location based on sstable directory
+     */
+    private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
+    {
+        for (int i = 0; i < directories.size(); i++)
+        {
+            Directories.DataDirectory directory = directories.get(i);
+            if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                return i;
+        }
+        return 0;
+    }
+
+    public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
+    {
+        return directories.get(getDiskIndex(sstable));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 7872554..14d3983 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -42,38 +42,27 @@ public class DiskBoundaryManager
     public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
     {
         if (!cfs.getPartitioner().splitter().isPresent())
-            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
-        // copy the reference to avoid getting nulled out by invalidate() below
-        // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
-        // returning null would be bad
-        DiskBoundaries db = diskBoundaries;
-        if (isOutOfDate(diskBoundaries))
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
+        if (diskBoundaries == null || diskBoundaries.isOutOfDate())
         {
             synchronized (this)
             {
-                db = diskBoundaries;
-                if (isOutOfDate(diskBoundaries))
+                if (diskBoundaries == null || diskBoundaries.isOutOfDate())
                 {
                     logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
                     DiskBoundaries oldBoundaries = diskBoundaries;
-                    db = diskBoundaries = getDiskBoundaryValue(cfs);
+                    diskBoundaries = getDiskBoundaryValue(cfs);
                     logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
                 }
             }
         }
-        return db;
+        return diskBoundaries;
     }
 
-    /**
-     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
-     */
-    private boolean isOutOfDate(DiskBoundaries db)
+    public void invalidate()
     {
-        if (db == null)
-            return true;
-        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
-        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
-        return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+       if (diskBoundaries != null)
+           diskBoundaries.invalidate();
     }
 
     private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
@@ -145,9 +134,4 @@ public class DiskBoundaryManager
         diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
         return diskBoundaries;
     }
-
-    public void invalidate()
-    {
-        diskBoundaries = null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3351736..4030384 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -525,7 +525,7 @@ public class CompactionManager implements CompactionManagerMBean
                 transaction.cancel(Sets.difference(originals, needsRelocation));
 
                 Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
-                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
+                                                                                                                         cfs.getCompactionStrategyManager().getCompactionStrategyIndex(s)));
 
                 int maxSize = 0;
                 for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -545,7 +545,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 if (!cfs.getPartitioner().splitter().isPresent())
                     return true;
-                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+                int directoryIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
                 Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 
                 Directories.DataDirectory location = locations[directoryIndex];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 6305096..4103433 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
 
@@ -36,7 +39,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -56,45 +58,73 @@ import org.apache.cassandra.service.ActiveRepairService;
  *
  * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
  * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ *
+ * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+ * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+ * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+ * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+ * {@link this#writeLock}.
+ *
+ * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+ * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+ * before acquiring the read lock to acess the strategies.
+ *
  */
-
 public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
     public final CompactionLogger compactionLogger;
     private final ColumnFamilyStore cfs;
+    private final boolean partitionSSTablesByTokenRange;
+    private final Supplier<DiskBoundaries> boundariesSupplier;
+
+    /**
+     * Performs mutual exclusion on the variables below
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+    /**
+     * Variables guarded by read and write lock above
+     */
+    //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
     private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
     private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+    private volatile CompactionParams params;
+    private DiskBoundaries currentBoundaries;
     private volatile boolean enabled = true;
     private volatile boolean isActive = true;
-    private volatile CompactionParams params;
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
-    /*
+    /**
         We keep a copy of the schema compaction parameters here to be able to decide if we
-        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+        should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
 
         If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
         we will use the new compaction parameters.
-     */
+     **/
     private volatile CompactionParams schemaCompactionParams;
-    private Directories.DataDirectory[] locations;
     private boolean shouldDefragment;
     private int fanout;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
+        this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+    }
+
+    @VisibleForTesting
+    public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+                                     boolean partitionSSTablesByTokenRange)
+    {
         cfs.getTracker().subscribe(this);
         logger.trace("{} subscribed to the data tracker.", this);
         this.cfs = cfs;
         this.compactionLogger = new CompactionLogger(cfs, this);
-        reload(cfs.metadata);
+        this.boundariesSupplier = boundariesSupplier;
+        this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
         params = cfs.metadata.params.compaction;
-        locations = getDirectories().getWriteableLocations();
         enabled = params.isEnabled();
-
+        reload(cfs.metadata.params.compaction);
     }
 
     /**
@@ -105,13 +135,13 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
             if (!isEnabled())
                 return null;
 
-            maybeReload(cfs.metadata);
             List<AbstractCompactionStrategy> strategies = new ArrayList<>();
 
             strategies.addAll(repaired);
@@ -181,7 +211,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
             {
                 if (sstable.openReason != SSTableReader.OpenReason.EARLY)
-                    getCompactionStrategyFor(sstable).addSSTable(sstable);
+                    compactionStrategyFor(sstable).addSSTable(sstable);
             }
             repaired.forEach(AbstractCompactionStrategy::startup);
             unrepaired.forEach(AbstractCompactionStrategy::startup);
@@ -205,12 +235,20 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @param sstable
      * @return
      */
-    public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    {
+        maybeReloadDiskBoundaries();
+        return compactionStrategyFor(sstable);
+    }
+
+    @VisibleForTesting
+    protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
     {
-        int index = getCompactionStrategyIndex(cfs, sstable);
+        // should not call maybeReloadDiskBoundaries because it may be called from within lock
         readLock.lock();
         try
         {
+            int index = compactionStrategyIndexFor(sstable);
             if (sstable.isRepaired())
                 return repaired.get(index);
             else
@@ -230,33 +268,33 @@ public class CompactionStrategyManager implements INotificationConsumer
      * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
      * sstables in the correct locations and give them to the correct compaction strategy instance.
      *
-     * @param cfs
      * @param sstable
      * @return
      */
-    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+    public int getCompactionStrategyIndex(SSTableReader sstable)
     {
-        if (!cfs.getPartitioner().splitter().isPresent())
-            return 0;
+        maybeReloadDiskBoundaries();
+        return compactionStrategyIndexFor(sstable);
+    }
 
-        DiskBoundaries boundaries = cfs.getDiskBoundaries();
-        List<Directories.DataDirectory> directories = boundaries.directories;
+    @VisibleForTesting
+    protected int compactionStrategyIndexFor(SSTableReader sstable)
+    {
+        // should not call maybeReload because it may be called from within lock
+        readLock.lock();
+        try
+        {
+            //We only have a single compaction strategy when sstables are not
+            //partitioned by token range
+            if (!partitionSSTablesByTokenRange)
+                return 0;
 
-        if (boundaries.positions == null)
+            return currentBoundaries.getDiskIndex(sstable);
+        }
+        finally
         {
-            // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.size(); i++)
-            {
-                Directories.DataDirectory directory = directories.get(i);
-                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
-                    return i;
-            }
-            return 0;
+            readLock.unlock();
         }
-
-        int pos = Collections.binarySearch(boundaries.positions, sstable.first);
-        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
-        return -pos - 1;
     }
 
     public void shutdown()
@@ -278,14 +316,48 @@ public class CompactionStrategyManager implements INotificationConsumer
     public void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
-        if (metadata.params.compaction.equals(schemaCompactionParams) &&
-            Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+        if (metadata.params.compaction.equals(schemaCompactionParams))
             return;
 
         writeLock.lock();
         try
         {
-            reload(metadata);
+            // compare the old schema configuration to the new one, ignore any locally set changes.
+            if (metadata.params.compaction.equals(schemaCompactionParams))
+                return;
+            reload(metadata.params.compaction);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Checks if the disk boundaries changed and reloads the compaction strategies
+     * to reflect the most up-to-date disk boundaries.
+     *
+     * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
+     * disk locations and boundaries are used.
+     *
+     * This should *never* be called inside by a thread holding the {@link this#readLock}, since it
+     * will potentially acquire the {@link this#writeLock} to update the compaction strategies
+     * what can cause a deadlock.
+     */
+    //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
+    @VisibleForTesting
+    protected boolean maybeReloadDiskBoundaries()
+    {
+        if (!currentBoundaries.isOutOfDate())
+            return false;
+
+        writeLock.lock();
+        try
+        {
+            if (!currentBoundaries.isOutOfDate())
+                return false;
+            reload(params);
+            return true;
         }
         finally
         {
@@ -297,20 +369,28 @@ public class CompactionStrategyManager implements INotificationConsumer
      * Reload the compaction strategies
      *
      * Called after changing configuration and at startup.
-     * @param metadata
+     * @param newCompactionParams
      */
-    private void reload(CFMetaData metadata)
+    private void reload(CompactionParams newCompactionParams)
     {
+        boolean enabledWithJMX = enabled && !shouldBeEnabled();
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
-        if (!metadata.params.compaction.equals(schemaCompactionParams))
-            logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
-        else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
-            logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
 
-        setStrategy(metadata.params.compaction);
-        schemaCompactionParams = metadata.params.compaction;
+        if (currentBoundaries != null)
+        {
+            if (!newCompactionParams.equals(schemaCompactionParams))
+                logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+            else if (currentBoundaries.isOutOfDate())
+                logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+        }
+
+        if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+            currentBoundaries = boundariesSupplier.get();
+
+        setStrategy(newCompactionParams);
+        schemaCompactionParams = cfs.metadata.params.compaction;
 
-        if (disabledWithJMX || !shouldBeEnabled())
+        if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
             disable();
         else
             enable();
@@ -326,6 +406,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int getUnleveledSSTables()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -353,6 +434,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int[] getSSTableCountPerLevel()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -401,6 +483,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Directories getDirectories()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -415,11 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void handleFlushNotification(Iterable<SSTableReader> added)
     {
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
+
         readLock.lock();
         try
         {
             for (SSTableReader sstable : added)
-                getCompactionStrategyFor(sstable).addSSTable(sstable);
+                compactionStrategyFor(sstable).addSSTable(sstable);
         }
         finally
         {
@@ -429,44 +517,47 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
     {
-        // a bit of gymnastics to be able to replace sstables in compaction strategies
-        // we use this to know that a compaction finished and where to start the next compaction in LCS
-        Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
-        int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
-
-        List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
-
-        for (int i = 0; i < locationSize; i++)
-        {
-            repairedRemoved.add(new HashSet<>());
-            repairedAdded.add(new HashSet<>());
-            unrepairedRemoved.add(new HashSet<>());
-            unrepairedAdded.add(new HashSet<>());
-        }
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
 
-        for (SSTableReader sstable : removed)
-        {
-            int i = getCompactionStrategyIndex(cfs, sstable);
-            if (sstable.isRepaired())
-                repairedRemoved.get(i).add(sstable);
-            else
-                unrepairedRemoved.get(i).add(sstable);
-        }
-        for (SSTableReader sstable : added)
-        {
-            int i = getCompactionStrategyIndex(cfs, sstable);
-            if (sstable.isRepaired())
-                repairedAdded.get(i).add(sstable);
-            else
-                unrepairedAdded.get(i).add(sstable);
-        }
-        // we need write lock here since we might be moving sstables between strategies
-        writeLock.lock();
+        readLock.lock();
         try
         {
+            // a bit of gymnastics to be able to replace sstables in compaction strategies
+            // we use this to know that a compaction finished and where to start the next compaction in LCS
+            int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
+
+            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+            for (int i = 0; i < locationSize; i++)
+            {
+                repairedRemoved.add(new HashSet<>());
+                repairedAdded.add(new HashSet<>());
+                unrepairedRemoved.add(new HashSet<>());
+                unrepairedAdded.add(new HashSet<>());
+            }
+
+            for (SSTableReader sstable : removed)
+            {
+                int i = compactionStrategyIndexFor(sstable);
+                if (sstable.isRepaired())
+                    repairedRemoved.get(i).add(sstable);
+                else
+                    unrepairedRemoved.get(i).add(sstable);
+            }
+            for (SSTableReader sstable : added)
+            {
+                int i = compactionStrategyIndexFor(sstable);
+                if (sstable.isRepaired())
+                    repairedAdded.get(i).add(sstable);
+                else
+                    unrepairedAdded.get(i).add(sstable);
+            }
             for (int i = 0; i < locationSize; i++)
             {
                 if (!repairedRemoved.get(i).isEmpty())
@@ -482,19 +573,23 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
     {
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
         // we need a write lock here since we move sstables from one strategy instance to another
-        writeLock.lock();
+        readLock.lock();
         try
         {
             for (SSTableReader sstable : sstables)
             {
-                int index = getCompactionStrategyIndex(cfs, sstable);
+                int index = compactionStrategyIndexFor(sstable);
                 if (sstable.isRepaired())
                 {
                     unrepaired.get(index).removeSSTable(sstable);
@@ -509,26 +604,29 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     private void handleDeletingNotification(SSTableReader deleted)
     {
-        writeLock.lock();
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
+        readLock.lock();
         try
         {
-            getCompactionStrategyFor(deleted).removeSSTable(deleted);
+            compactionStrategyFor(deleted).removeSSTable(deleted);
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     public void handleNotification(INotification notification, Object sender)
     {
-        maybeReload(cfs.metadata);
         if (notification instanceof SSTableAddedNotification)
         {
             handleFlushNotification(((SSTableAddedNotification) notification).added);
@@ -595,29 +693,29 @@ public class CompactionStrategyManager implements INotificationConsumer
     @SuppressWarnings("resource")
     public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
-        assert repaired.size() == unrepaired.size();
-        List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
-        List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
-
-        for (int i = 0; i < repaired.size(); i++)
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
         {
-            repairedSSTables.add(new HashSet<>());
-            unrepairedSSTables.add(new HashSet<>());
-        }
+            assert repaired.size() == unrepaired.size();
+            List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+            List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
 
-        for (SSTableReader sstable : sstables)
-        {
-            if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
-            else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
-        }
+            for (int i = 0; i < repaired.size(); i++)
+            {
+                repairedSSTables.add(new HashSet<>());
+                unrepairedSSTables.add(new HashSet<>());
+            }
 
-        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+            for (SSTableReader sstable : sstables)
+            {
+                if (sstable.isRepaired())
+                    repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+                else
+                    unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+            }
 
-        readLock.lock();
-        try
-        {
+            List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
             for (int i = 0; i < repairedSSTables.size(); i++)
             {
                 if (!repairedSSTables.get(i).isEmpty())
@@ -644,10 +742,11 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
-            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
             Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
             for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -675,29 +774,47 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
-        maybeReload(cfs.metadata);
-        validateForCompaction(txn.originals(), cfs, getDirectories());
-        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
+        {
+            validateForCompaction(txn.originals());
+            return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+
     }
 
-    private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+    private void validateForCompaction(Iterable<SSTableReader> input)
     {
-        SSTableReader firstSSTable = Iterables.getFirst(input, null);
-        assert firstSSTable != null;
-        boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
-        for (SSTableReader sstable : input)
+        readLock.lock();
+        try
+        {
+            SSTableReader firstSSTable = Iterables.getFirst(input, null);
+            assert firstSSTable != null;
+            boolean repaired = firstSSTable.isRepaired();
+            int firstIndex = compactionStrategyIndexFor(firstSSTable);
+            for (SSTableReader sstable : input)
+            {
+                if (sstable.isRepaired() != repaired)
+                    throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+                if (firstIndex != compactionStrategyIndexFor(sstable))
+                    throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            }
+        }
+        finally
         {
-            if (sstable.isRepaired() != repaired)
-                throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
-                throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            readLock.unlock();
         }
+
     }
 
     public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
     {
-        maybeReload(cfs.metadata);
+        maybeReloadDiskBoundaries();
         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
         // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
         // sstables are marked the compactions are re-enabled
@@ -745,18 +862,18 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
     {
-        maybeReload(cfs.metadata);
+        maybeReloadDiskBoundaries();
         List<AbstractCompactionTask> ret = new ArrayList<>();
         readLock.lock();
         try
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
                                                                          .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                         .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
                                                                            .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
-                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                           .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
@@ -773,20 +890,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
-    /**
-     * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
-     */
-    @Deprecated()
-    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
-    {
-        validateForCompaction(sstables, cfs, getDirectories());
-        List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
-        assert tasks.size() == 1;
-        return tasks.get(0);
-    }
-
     public int getEstimatedRemainingTasks()
     {
+        maybeReloadDiskBoundaries();
         int tasks = 0;
         readLock.lock();
         try
@@ -811,6 +917,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public String getName()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -824,6 +931,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public List<List<AbstractCompactionStrategy>> getStrategies()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -861,10 +969,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         repaired.clear();
         unrepaired.clear();
 
-        if (cfs.getPartitioner().splitter().isPresent())
+        if (partitionSSTablesByTokenRange)
         {
-            locations = cfs.getDirectories().getWriteableLocations();
-            for (int i = 0; i < locations.length; i++)
+            for (int i = 0; i < currentBoundaries.directories.size(); i++)
             {
                 repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
                 unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
@@ -896,6 +1003,7 @@ public class CompactionStrategyManager implements INotificationConsumer
                                                        Collection<Index> indexes,
                                                        LifecycleTransaction txn)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -921,21 +1029,21 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
     {
-        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        if (cfs.getPartitioner().splitter().isPresent())
+        List<Directories.DataDirectory> locations = currentBoundaries.directories;
+        if (partitionSSTablesByTokenRange)
         {
             int unrepairedIndex = unrepaired.indexOf(strategy);
             if (unrepairedIndex > 0)
             {
-                return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+                return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath());
             }
             int repairedIndex = repaired.indexOf(strategy);
             if (repairedIndex > 0)
             {
-                return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+                return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath());
             }
         }
-        List<String> folders = new ArrayList<>(locations.length);
+        List<String> folders = new ArrayList<>(locations.size());
         for (Directories.DataDirectory location : locations)
         {
             folders.add(location.location.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b1f2e9f..4635824 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+        int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
         this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 
@@ -508,7 +508,7 @@ public class Scrubber implements Closeable
                         nextToOffer = peek; // Offer peek in next call
                         return next;
                     }
-    
+
                     // Duplicate row, merge it.
                     next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0dd134a..96b733e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -65,7 +65,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
     protected volatile int estimatedRemainingTasks;
-    private final Set<SSTableReader> sstables = new HashSet<>();
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
 
     public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 820b016..3dbf3d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -288,7 +288,7 @@ public class CassandraDaemon
         {
             if (logger.isDebugEnabled())
                 logger.debug("opening keyspace {}", keyspaceName);
-            // disable auto compaction until commit log replay ends
+            // disable auto compaction until gossip settles since disk boundaries may be affected by ring layout
             for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
             {
                 for (ColumnFamilyStore store : cfs.concatWithIndexes())
@@ -298,7 +298,6 @@ public class CassandraDaemon
             }
         }
 
-
         try
         {
             loadRowAndKeyCacheAsync().get();
@@ -338,19 +337,6 @@ public class CassandraDaemon
         // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
         LegacyBatchlogMigrator.migrate();
 
-        // enable auto compaction
-        for (Keyspace keyspace : Keyspace.all())
-        {
-            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-            {
-                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
-                {
-                    if (store.getCompactionStrategyManager().shouldBeEnabled())
-                        store.enableAutoCompaction();
-                }
-            }
-        }
-
         SystemKeyspace.finishStartup();
 
         // Prepared statements
@@ -413,6 +399,22 @@ public class CassandraDaemon
         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
             Gossiper.waitToSettle();
 
+        // re-enable auto-compaction after gossip is settled, so correct disk boundaries are used
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.reload(); //reload CFs in case there was a change of disk boundaries
+                    if (store.getCompactionStrategyManager().shouldBeEnabled())
+                    {
+                        store.enableAutoCompaction();
+                    }
+                }
+            }
+        }
+
         // schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
         // due to scheduling errors or race conditions
         ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e93430b..fafe8e8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1496,6 +1496,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             SystemKeyspace.resetAvailableRanges();
         }
 
+        // Force disk boundary invalidation now that local tokens are set
+        invalidateDiskBoundaries();
+
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
         BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
         bootstrapper.addProgressListener(progressSupport);
@@ -1527,6 +1530,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    private void invalidateDiskBoundaries()
+    {
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.invalidateDiskBoundaries();
+                }
+            }
+        }
+    }
+
     /**
      * All MVs have been created during bootstrap, so mark them as built
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
new file mode 100644
index 0000000..c654fcd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.DiskBoundaryManager;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerTest
+{
+    private static final String KS_PREFIX = "Keyspace1";
+    private static final String TABLE_PREFIX = "CF_STANDARD";
+
+    private static IPartitioner originalPartitioner;
+    private static boolean backups;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        SchemaLoader.prepareServer();
+        backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+        /**
+         * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+         * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+         */
+        originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+    }
+
+    @Test
+    public void testSSTablesAssignedToCorrectCompactionStrategy()
+    {
+        // Creates 100 SSTables with keys 0-99
+        int numSSTables = 100;
+        SchemaLoader.createKeyspace(KS_PREFIX,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+                                                .compaction(CompactionParams.scts(Collections.emptyMap())));
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        cfs.disableAutoCompaction();
+        for (int i = 0; i < numSSTables; i++)
+        {
+            createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
+        }
+
+        // Creates a CompactionStrategymanager with different numbers of disks and check
+        // if the SSTables are assigned to the correct compaction strategies
+        for (int numDisks = 2; numDisks < 10; numDisks++)
+        {
+            testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+        }
+    }
+
+    public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+    {
+        // Create a mock CFS with the given number of disks
+        MockCFS cfs = createJBODMockCFS(numDisks);
+        //Check that CFS will contain numSSTables
+        assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+        // Creates a compaction strategy manager with an external boundary supplier
+        final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+
+        MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+        System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+        CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+                                                                      true);
+
+        // Check that SSTables are assigned to the correct Compaction Strategy
+        for (SSTableReader reader : cfs.getLiveSSTables())
+        {
+            verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+        }
+
+        for (int delta = 1; delta <= 3; delta++)
+        {
+            // Update disk boundaries
+            Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+            updateBoundaries(mockBoundaryManager, boundaries, delta);
+
+            // Check that SSTables are still assigned to the previous boundary layout
+            System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+            for (SSTableReader reader : cfs.getLiveSSTables())
+            {
+                verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+            }
+
+            // Reload CompactionStrategyManager so new disk boundaries will be loaded
+            csm.maybeReloadDiskBoundaries();
+
+            for (SSTableReader reader : cfs.getLiveSSTables())
+            {
+                // Check that SSTables are assigned to the new boundary layout
+                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+
+                // Remove SSTable and check that it will be removed from the correct compaction strategy
+                csm.handleNotification(new SSTableDeletingNotification(reader), this);
+                assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+
+                // Add SSTable again and check that is correctly assigned
+                csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
+                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+            }
+        }
+    }
+
+    private MockCFS createJBODMockCFS(int disks)
+    {
+        // Create #disks data directories to simulate JBOD
+        Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+        for (int i = 0; i < disks; ++i)
+        {
+            File tempDir = Files.createTempDir();
+            tempDir.deleteOnExit();
+            directories[i] = new Directories.DataDirectory(tempDir);
+        }
+
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
+        mockCFS.disableAutoCompaction();
+        mockCFS.addSSTables(cfs.getLiveSSTables());
+        return mockCFS;
+    }
+
+    /**
+     * Updates the boundaries with a delta
+     */
+    private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+    {
+        for (int j = 0; j < boundaries.length - 1; j++)
+        {
+            if ((j + delta) % 2 == 0)
+                boundaries[j] -= delta;
+            else
+                boundaries[j] += delta;
+        }
+        boundaryManager.invalidateBoundaries();
+    }
+
+    private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+    {
+        // Check that sstable is assigned to correct disk
+        int index = getSSTableIndex(boundaries, reader);
+        assertEquals(index, csm.compactionStrategyIndexFor(reader));
+        // Check that compaction strategy actually contains SSTable
+        assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+    }
+
+    /**
+     * Creates disk boundaries such that each disk receives
+     * an equal amount of SSTables
+     */
+    private Integer[] computeBoundaries(int numSSTables, int numDisks)
+    {
+        Integer[] result = new Integer[numDisks];
+        int sstablesPerRange = numSSTables / numDisks;
+        result[0] = sstablesPerRange;
+        for (int i = 1; i < numDisks; i++)
+        {
+            result[i] = result[i - 1] + sstablesPerRange;
+        }
+        result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+        return result;
+    }
+
+    /**
+     * Since each SSTable contains keys from 0-99, and each sstable
+     * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+     * we can compute the sstable position in the disk boundaries by finding
+     * the generation position relative to the boundaries
+     */
+    private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+    {
+        int index = 0;
+        while (boundaries[index] < reader.descriptor.generation)
+            index++;
+        System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+        return index;
+    }
+
+
+
+    class MockBoundaryManager
+    {
+        private final ColumnFamilyStore cfs;
+        private Integer[] positions;
+        private DiskBoundaries boundaries;
+
+        public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+        {
+            this.cfs = cfs;
+            this.positions = positions;
+            this.boundaries = createDiskBoundaries(cfs, positions);
+        }
+
+        public void invalidateBoundaries()
+        {
+            boundaries.invalidate();
+        }
+
+        public DiskBoundaries getBoundaries()
+        {
+            if (boundaries.isOutOfDate())
+                boundaries = createDiskBoundaries(cfs, positions);
+            return boundaries;
+        }
+
+        private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+        {
+            List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+        }
+    }
+
+    private static void createSSTableWithKey(String keyspace, String table, int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey dk = Util.dk(String.format("%04d", key));
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+        .clustering(Integer.toString(key))
+        .add("val", "val")
+        .build()
+        .applyUnsafe();
+        cfs.forceBlockingFlush();
+    }
+
+    // just to be able to override the data directories
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 1335906..7873ac9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -104,6 +104,12 @@ public class CompactionsCQLTest extends CQLTester
         assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
         getCurrentColumnFamilyStore().enableAutoCompaction();
         assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+        // Alter keyspace replication settings to force compaction strategy reload and check strategy is still enabled
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        getCurrentColumnFamilyStore().getCompactionStrategyManager().maybeReloadDiskBoundaries();
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
         execute("insert into %s (id) values ('1')");
         flush();
         execute("insert into %s (id) values ('1')");
@@ -161,17 +167,22 @@ public class CompactionsCQLTest extends CQLTester
         localOptions.put("class", "DateTieredCompactionStrategy");
         getCurrentColumnFamilyStore().setCompactionParameters(localOptions);
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+        // Invalidate disk boundaries to ensure that boundary invalidation will not cause the old strategy to be reloaded
+        getCurrentColumnFamilyStore().invalidateDiskBoundaries();
         // altering something non-compaction related
         execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
         // should keep the local compaction strat
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+        // Alter keyspace replication settings to force compaction strategy reload
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        // should keep the local compaction strat
+        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
         // altering a compaction option
         execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
         // will use the new option
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
     }
 
-
     @Test
     public void testSetLocalCompactionStrategyDisable() throws Throwable
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by pa...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29a0d1f8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29a0d1f8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29a0d1f8

Branch: refs/heads/trunk
Commit: 29a0d1f82f965d6e13161c420396f4681ff4c725
Parents: 9110c08 25e46f0
Author: Paulo Motta <pa...@apache.org>
Authored: Sat Dec 9 05:30:27 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:32:52 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/Directories.java    |   7 +
 .../org/apache/cassandra/db/DiskBoundaries.java |  58 ++-
 .../cassandra/db/DiskBoundaryManager.java       |  32 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../compaction/CompactionStrategyManager.java   | 417 ++++++++++++-------
 .../cassandra/db/compaction/Scrubber.java       |   4 +-
 .../SizeTieredCompactionStrategy.java           |   3 +-
 .../cassandra/service/CassandraDaemon.java      |  31 +-
 .../cassandra/service/StorageService.java       |  17 +
 .../cassandra/db/DiskBoundaryManagerTest.java   |  10 +-
 .../CompactionStrategyManagerTest.java          | 309 ++++++++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +-
 13 files changed, 701 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cd8c678,b7a6e14..baeff1a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,176 -1,5 +1,177 @@@
 +4.0
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction (CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements (CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 +
 +
  3.11.2
+  * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
   * Remove OpenJDK log warning (CASSANDRA-13916)
   * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
   * Cache disk boundaries (CASSANDRA-13215)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093,7bfed28..24191a1
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@@ -18,10 -18,15 +18,16 @@@
  
  package org.apache.cassandra.db;
  
+ import java.util.Collections;
  import java.util.List;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.ImmutableList;
  
++import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.service.StorageService;
+ 
  public class DiskBoundaries
  {
      public final List<Directories.DataDirectory> directories;
@@@ -68,4 -80,52 +81,47 @@@
                 ", directoriesVersion=" + directoriesVersion +
                 '}';
      }
+ 
+     /**
+      * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+      */
+     public boolean isOutOfDate()
+     {
+         if (isInvalid)
+             return true;
+         int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+         long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+         return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+     }
+ 
+     public void invalidate()
+     {
+         this.isInvalid = true;
+     }
+ 
+     public int getDiskIndex(SSTableReader sstable)
+     {
+         if (positions == null)
+         {
 -            return getBoundariesFromSSTableDirectory(sstable);
++            return getBoundariesFromSSTableDirectory(sstable.descriptor);
+         }
+ 
+         int pos = Collections.binarySearch(positions, sstable.first);
+         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+         return -pos - 1;
+     }
+ 
+     /**
+      * Try to figure out location based on sstable directory
+      */
 -    private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
++    public int getBoundariesFromSSTableDirectory(Descriptor descriptor)
+     {
+         for (int i = 0; i < directories.size(); i++)
+         {
+             Directories.DataDirectory directory = directories.get(i);
 -            if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
++            if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                 return i;
+         }
+         return 0;
+     }
 -
 -    public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
 -    {
 -        return directories.get(getDiskIndex(sstable));
 -    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index aa50fb1,4103433..8a01ba9
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@@ -30,8 -29,9 +30,7 @@@ import com.google.common.annotations.Vi
  import com.google.common.collect.Iterables;
  
  import org.apache.cassandra.db.DiskBoundaries;
- import org.apache.cassandra.schema.TableMetadata;
 -import org.apache.cassandra.db.Memtable;
  import org.apache.cassandra.index.Index;
 -import com.google.common.primitives.Ints;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -51,55 -51,80 +50,84 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.notifications.*;
  import org.apache.cassandra.schema.CompactionParams;
++import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.utils.Pair;
  
  /**
   * Manages the compaction strategies.
   *
 - * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
 - * unrepaired data. This is done to be able to totally separate the different sets of sstables.
 + * For each directory, a separate compaction strategy instance for both repaired and unrepaired data, and also one instance
 + * for each pending repair. This is done to keep the different sets of sstables completely separate.
+  *
+  * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+  * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+  * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+  * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+  * {@link this#writeLock}.
+  *
+  * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+  * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+  * before acquiring the read lock to acess the strategies.
+  *
   */
 +
  public class CompactionStrategyManager implements INotificationConsumer
  {
      private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
      public final CompactionLogger compactionLogger;
      private final ColumnFamilyStore cfs;
+     private final boolean partitionSSTablesByTokenRange;
+     private final Supplier<DiskBoundaries> boundariesSupplier;
+ 
+     /**
+      * Performs mutual exclusion on the variables below
+      */
+     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+     private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+     private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+ 
+     /**
+      * Variables guarded by read and write lock above
+      */
 -    //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
      private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
      private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
 +    private final List<PendingRepairManager> pendingRepairs = new ArrayList<>();
+     private volatile CompactionParams params;
+     private DiskBoundaries currentBoundaries;
      private volatile boolean enabled = true;
      private volatile boolean isActive = true;
-     private volatile CompactionParams params;
-     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-     private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-     private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
  
 -    /**
 +    /*
          We keep a copy of the schema compaction parameters here to be able to decide if we
-         should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
 -        should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
++        should update the compaction strategy in maybeReload() due to an ALTER.
  
          If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
          we will use the new compaction parameters.
 -     **/
 +     */
      private volatile CompactionParams schemaCompactionParams;
-     private Directories.DataDirectory[] locations;
      private boolean shouldDefragment;
      private int fanout;
  
 +
      public CompactionStrategyManager(ColumnFamilyStore cfs)
      {
+         this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+     }
+ 
+     @VisibleForTesting
+     public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+                                      boolean partitionSSTablesByTokenRange)
+     {
          cfs.getTracker().subscribe(this);
          logger.trace("{} subscribed to the data tracker.", this);
          this.cfs = cfs;
          this.compactionLogger = new CompactionLogger(cfs, this);
-         reload(cfs.metadata());
+         this.boundariesSupplier = boundariesSupplier;
+         this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
 -        params = cfs.metadata.params.compaction;
 +        params = cfs.metadata().params.compaction;
-         locations = getDirectories().getWriteableLocations();
          enabled = params.isEnabled();
- 
 -        reload(cfs.metadata.params.compaction);
++        reload(cfs.metadata().params.compaction);
      }
  
      /**
@@@ -116,57 -142,17 +145,55 @@@
              if (!isEnabled())
                  return null;
  
-             maybeReload(cfs.metadata());
- 
 -            List<AbstractCompactionStrategy> strategies = new ArrayList<>();
 +            // first try to promote/demote sstables from completed repairs
 +            ArrayList<Pair<Integer, PendingRepairManager>> pendingRepairManagers = new ArrayList<>(pendingRepairs.size());
 +            for (PendingRepairManager pendingRepair : pendingRepairs)
 +            {
 +                int numPending = pendingRepair.getNumPendingRepairFinishedTasks();
 +                if (numPending > 0)
 +                {
 +                    pendingRepairManagers.add(Pair.create(numPending, pendingRepair));
 +                }
 +            }
 +            if (!pendingRepairManagers.isEmpty())
 +            {
 +                pendingRepairManagers.sort((x, y) -> y.left - x.left);
 +                for (Pair<Integer, PendingRepairManager> pair : pendingRepairManagers)
 +                {
 +                    AbstractCompactionTask task = pair.right.getNextRepairFinishedTask();
 +                    if (task != null)
 +                    {
 +                        return task;
 +                    }
 +                }
 +            }
 +
 +            // sort compaction task suppliers by remaining tasks descending
 +            ArrayList<Pair<Integer, Supplier<AbstractCompactionTask>>> sortedSuppliers = new ArrayList<>(repaired.size() + unrepaired.size() + 1);
 +
 +            for (AbstractCompactionStrategy strategy : repaired)
 +                sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore)));
 +
 +            for (AbstractCompactionStrategy strategy : unrepaired)
 +                sortedSuppliers.add(Pair.create(strategy.getEstimatedRemainingTasks(), () -> strategy.getNextBackgroundTask(gcBefore)));
 +
 +            for (PendingRepairManager pending : pendingRepairs)
 +                sortedSuppliers.add(Pair.create(pending.getMaxEstimatedRemainingTasks(), () -> pending.getNextBackgroundTask(gcBefore)));
  
 -            strategies.addAll(repaired);
 -            strategies.addAll(unrepaired);
 -            Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
 -            for (AbstractCompactionStrategy strategy : strategies)
 +            sortedSuppliers.sort((x, y) -> y.left - x.left);
 +
 +            // return the first non-null task
 +            AbstractCompactionTask task;
 +            Iterator<Supplier<AbstractCompactionTask>> suppliers = Iterables.transform(sortedSuppliers, p -> p.right).iterator();
 +            assert suppliers.hasNext();
 +
 +            do
              {
 -                AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
 -                if (task != null)
 -                    return task;
 +                task = suppliers.next().get();
              }
 +            while (suppliers.hasNext() && task == null);
 +
 +            return task;
          }
          finally
          {
@@@ -250,15 -235,21 +277,23 @@@
       * @param sstable
       * @return
       */
 -    protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
 +    public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
      {
-         int index = getCompactionStrategyIndex(cfs, sstable);
+         maybeReloadDiskBoundaries();
+         return compactionStrategyFor(sstable);
+     }
+ 
+     @VisibleForTesting
+     protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
+     {
+         // should not call maybeReloadDiskBoundaries because it may be called from within lock
          readLock.lock();
          try
          {
+             int index = compactionStrategyIndexFor(sstable);
 -            if (sstable.isRepaired())
 +            if (sstable.isPendingRepair())
 +                return pendingRepairs.get(index).getOrCreate(sstable);
 +            else if (sstable.isRepaired())
                  return repaired.get(index);
              else
                  return unrepaired.get(index);
@@@ -280,73 -271,32 +315,65 @@@
       * @param sstable
       * @return
       */
-     public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+     public int getCompactionStrategyIndex(SSTableReader sstable)
      {
-         if (!cfs.getPartitioner().splitter().isPresent())
-             return 0;
- 
-         DiskBoundaries boundaries = cfs.getDiskBoundaries();
-         List<Directories.DataDirectory> directories = boundaries.directories;
-         if (boundaries.positions == null)
-             return getCompactionStrategyIndex(directories, sstable.descriptor);
- 
-         int pos = Collections.binarySearch(boundaries.positions, sstable.first);
-         assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
-         return -pos - 1;
+         maybeReloadDiskBoundaries();
+         return compactionStrategyIndexFor(sstable);
      }
  
-     /**
-      * get the index for the descriptor based on the existing directories
-      * @param locations
-      * @param descriptor
-      * @return
-      */
-     private static int getCompactionStrategyIndex(List<Directories.DataDirectory> directories, Descriptor descriptor)
+     @VisibleForTesting
+     protected int compactionStrategyIndexFor(SSTableReader sstable)
      {
-          // try to figure out location based on sstable directory:
-          for (int i = 0; i < directories.size(); i++)
-          {
-              Directories.DataDirectory directory = directories.get(i);
-              if (descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
-                  return i;
-          }
-          return 0;
 -        // should not call maybeReload because it may be called from within lock
++        // should not call maybeReloadDiskBoundaries because it may be called from within lock
+         readLock.lock();
+         try
+         {
+             //We only have a single compaction strategy when sstables are not
+             //partitioned by token range
+             if (!partitionSSTablesByTokenRange)
+                 return 0;
+ 
+             return currentBoundaries.getDiskIndex(sstable);
+         }
+         finally
+         {
+             readLock.unlock();
+         }
      }
  
 +    @VisibleForTesting
 +    List<AbstractCompactionStrategy> getRepaired()
 +    {
 +        return repaired;
 +    }
 +
 +    @VisibleForTesting
 +    List<AbstractCompactionStrategy> getUnrepaired()
 +    {
 +        return unrepaired;
 +    }
 +
 +    @VisibleForTesting
 +    List<AbstractCompactionStrategy> getForPendingRepair(UUID sessionID)
 +    {
 +        List<AbstractCompactionStrategy> strategies = new ArrayList<>(pendingRepairs.size());
 +        pendingRepairs.forEach(p -> strategies.add(p.get(sessionID)));
 +        return strategies;
 +    }
 +
 +    @VisibleForTesting
 +    Set<UUID> pendingRepairs()
 +    {
 +        Set<UUID> ids = new HashSet<>();
 +        pendingRepairs.forEach(p -> ids.addAll(p.getSessions()));
 +        return ids;
 +    }
 +
 +    public boolean hasDataForPendingRepair(UUID sessionID)
 +    {
 +        return Iterables.any(pendingRepairs, prm -> prm.hasDataForSession(sessionID));
 +    }
 +
      public void shutdown()
      {
          writeLock.lock();
@@@ -364,11 -313,10 +391,10 @@@
          }
      }
  
 -    public void maybeReload(CFMetaData metadata)
 +    public void maybeReload(TableMetadata metadata)
      {
          // compare the old schema configuration to the new one, ignore any locally set changes.
-         if (metadata.params.compaction.equals(schemaCompactionParams) &&
-             Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+         if (metadata.params.compaction.equals(schemaCompactionParams))
              return;
  
          writeLock.lock();
@@@ -386,28 -369,44 +447,37 @@@
       * Reload the compaction strategies
       *
       * Called after changing configuration and at startup.
-      * @param metadata
+      * @param newCompactionParams
       */
-     private void reload(TableMetadata metadata)
+     private void reload(CompactionParams newCompactionParams)
      {
+         boolean enabledWithJMX = enabled && !shouldBeEnabled();
          boolean disabledWithJMX = !enabled && shouldBeEnabled();
-         if (!metadata.params.compaction.equals(schemaCompactionParams))
-             logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
-         else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
-             logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
  
-         setStrategy(metadata.params.compaction);
-         schemaCompactionParams = metadata.params.compaction;
+         if (currentBoundaries != null)
+         {
+             if (!newCompactionParams.equals(schemaCompactionParams))
+                 logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+             else if (currentBoundaries.isOutOfDate())
+                 logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+         }
+ 
+         if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+             currentBoundaries = boundariesSupplier.get();
+ 
+         setStrategy(newCompactionParams);
 -        schemaCompactionParams = cfs.metadata.params.compaction;
++        schemaCompactionParams = cfs.metadata().params.compaction;
  
-         if (disabledWithJMX || !shouldBeEnabled())
+         if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
              disable();
          else
              enable();
          startup();
      }
  
 -    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
 -    {
 -        cfs.getTracker().replaceFlushed(memtable, sstables);
 -        if (sstables != null && !sstables.isEmpty())
 -            CompactionManager.instance.submitBackground(cfs);
 -    }
 -
      public int getUnleveledSSTables()
      {
+         maybeReloadDiskBoundaries();
          readLock.lock();
          try
          {
@@@ -519,65 -517,49 +596,69 @@@
  
      private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
      {
-         // a bit of gymnastics to be able to replace sstables in compaction strategies
-         // we use this to know that a compaction finished and where to start the next compaction in LCS
-         Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
-         int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
- 
-         List<Set<SSTableReader>> pendingRemoved = new ArrayList<>(locationSize);
-         List<Set<SSTableReader>> pendingAdded = new ArrayList<>(locationSize);
-         List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
-         List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
-         List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
-         List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
- 
-         for (int i = 0; i < locationSize; i++)
-         {
-             pendingRemoved.add(new HashSet<>());
-             pendingAdded.add(new HashSet<>());
-             repairedRemoved.add(new HashSet<>());
-             repairedAdded.add(new HashSet<>());
-             unrepairedRemoved.add(new HashSet<>());
-             unrepairedAdded.add(new HashSet<>());
-         }
+         // If reloaded, SSTables will be placed in their correct locations
+         // so there is no need to process notification
+         if (maybeReloadDiskBoundaries())
+             return;
  
-         for (SSTableReader sstable : removed)
-         {
-             int i = getCompactionStrategyIndex(cfs, sstable);
-             if (sstable.isPendingRepair())
-                 pendingRemoved.get(i).add(sstable);
-             else if (sstable.isRepaired())
-                 repairedRemoved.get(i).add(sstable);
-             else
-                 unrepairedRemoved.get(i).add(sstable);
-         }
-         for (SSTableReader sstable : added)
-         {
-             int i = getCompactionStrategyIndex(cfs, sstable);
-             if (sstable.isPendingRepair())
-                 pendingAdded.get(i).add(sstable);
-             else if (sstable.isRepaired())
-                 repairedAdded.get(i).add(sstable);
-             else
-                 unrepairedAdded.get(i).add(sstable);
-         }
-         // we need write lock here since we might be moving sstables between strategies
-         writeLock.lock();
+         readLock.lock();
          try
          {
+             // a bit of gymnastics to be able to replace sstables in compaction strategies
+             // we use this to know that a compaction finished and where to start the next compaction in LCS
 -            int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
++            Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
++            int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+ 
++            List<Set<SSTableReader>> pendingRemoved = new ArrayList<>(locationSize);
++            List<Set<SSTableReader>> pendingAdded = new ArrayList<>(locationSize);
+             List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+             List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+             List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+             List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+ 
+             for (int i = 0; i < locationSize; i++)
+             {
++                pendingRemoved.add(new HashSet<>());
++                pendingAdded.add(new HashSet<>());
+                 repairedRemoved.add(new HashSet<>());
+                 repairedAdded.add(new HashSet<>());
+                 unrepairedRemoved.add(new HashSet<>());
+                 unrepairedAdded.add(new HashSet<>());
+             }
+ 
+             for (SSTableReader sstable : removed)
+             {
+                 int i = compactionStrategyIndexFor(sstable);
 -                if (sstable.isRepaired())
++                if (sstable.isPendingRepair())
++                    pendingRemoved.get(i).add(sstable);
++                else if (sstable.isRepaired())
+                     repairedRemoved.get(i).add(sstable);
+                 else
+                     unrepairedRemoved.get(i).add(sstable);
+             }
+             for (SSTableReader sstable : added)
+             {
+                 int i = compactionStrategyIndexFor(sstable);
 -                if (sstable.isRepaired())
++                if (sstable.isPendingRepair())
++                    pendingAdded.get(i).add(sstable);
++                else if (sstable.isRepaired())
+                     repairedAdded.get(i).add(sstable);
+                 else
+                     unrepairedAdded.get(i).add(sstable);
+             }
              for (int i = 0; i < locationSize; i++)
              {
 +
 +                if (!pendingRemoved.get(i).isEmpty())
 +                {
 +                    pendingRepairs.get(i).replaceSSTables(pendingRemoved.get(i), pendingAdded.get(i));
 +                }
 +                else
 +                {
 +                    PendingRepairManager pendingManager = pendingRepairs.get(i);
 +                    pendingAdded.get(i).forEach(s -> pendingManager.addSSTable(s));
 +                }
 +
                  if (!repairedRemoved.get(i).isEmpty())
                      repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
                  else
@@@ -603,16 -589,9 +688,16 @@@
          {
              for (SSTableReader sstable : sstables)
              {
-                 int index = getCompactionStrategyIndex(cfs, sstable);
+                 int index = compactionStrategyIndexFor(sstable);
 -                if (sstable.isRepaired())
 +                if (sstable.isPendingRepair())
                  {
 +                    pendingRepairs.get(index).addSSTable(sstable);
 +                    unrepaired.get(index).removeSSTable(sstable);
 +                    repaired.get(index).removeSSTable(sstable);
 +                }
 +                else if (sstable.isRepaired())
 +                {
 +                    pendingRepairs.get(index).removeSSTable(sstable);
                      unrepaired.get(index).removeSSTable(sstable);
                      repaired.get(index).addSSTable(sstable);
                  }
@@@ -632,10 -610,14 +717,14 @@@
  
      private void handleDeletingNotification(SSTableReader deleted)
      {
-         writeLock.lock();
+         // If reloaded, SSTables will be placed in their correct locations
+         // so there is no need to process notification
+         if (maybeReloadDiskBoundaries())
+             return;
+         readLock.lock();
          try
          {
 -            compactionStrategyFor(deleted).removeSSTable(deleted);
 +            getCompactionStrategyFor(deleted).removeSSTable(deleted);
          }
          finally
          {
@@@ -701,44 -691,31 +789,44 @@@
       * @return
       */
      @SuppressWarnings("resource")
 -    public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
 +    public AbstractCompactionStrategy.ScannerList maybeGetScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
      {
-         assert repaired.size() == unrepaired.size();
-         assert repaired.size() == pendingRepairs.size();
- 
-         int numRepaired = repaired.size();
-         List<Set<SSTableReader>> pendingSSTables = new ArrayList<>(numRepaired);
-         List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(numRepaired);
-         List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>(numRepaired);
- 
-         for (int i = 0; i < numRepaired; i++)
-         {
-             pendingSSTables.add(new HashSet<>());
-             repairedSSTables.add(new HashSet<>());
-             unrepairedSSTables.add(new HashSet<>());
-         }
- 
-         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
- 
+         maybeReloadDiskBoundaries();
          readLock.lock();
++        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
          try
          {
+             assert repaired.size() == unrepaired.size();
 -            List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
 -            List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
++            assert repaired.size() == pendingRepairs.size();
++
++            int numRepaired = repaired.size();
++            List<Set<SSTableReader>> pendingSSTables = new ArrayList<>(numRepaired);
++            List<Set<SSTableReader>> repairedSSTables = new ArrayList<>(numRepaired);
++            List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>(numRepaired);
+ 
 -            for (int i = 0; i < repaired.size(); i++)
++            for (int i = 0; i < numRepaired; i++)
+             {
++                pendingSSTables.add(new HashSet<>());
+                 repairedSSTables.add(new HashSet<>());
+                 unrepairedSSTables.add(new HashSet<>());
+             }
+ 
              for (SSTableReader sstable : sstables)
              {
-                 int idx = getCompactionStrategyIndex(cfs, sstable);
 -                if (sstable.isRepaired())
 -                    repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
++                int idx = compactionStrategyIndexFor(sstable);
 +                if (sstable.isPendingRepair())
 +                    pendingSSTables.get(idx).add(sstable);
 +                else if (sstable.isRepaired())
 +                    repairedSSTables.get(idx).add(sstable);
                  else
 -                    unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
 +                    unrepairedSSTables.get(idx).add(sstable);
              }
  
 -            List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 +            for (int i = 0; i < pendingSSTables.size(); i++)
 +            {
 +                if (!pendingSSTables.get(i).isEmpty())
 +                    scanners.addAll(pendingRepairs.get(i).getScanners(pendingSSTables.get(i), ranges));
 +            }
              for (int i = 0; i < repairedSSTables.size(); i++)
              {
                  if (!repairedSSTables.get(i).isEmpty())
@@@ -814,28 -774,42 +903,45 @@@
  
      public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
      {
-         maybeReload(cfs.metadata());
-         validateForCompaction(txn.originals(), cfs, getDirectories());
-         return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+         maybeReloadDiskBoundaries();
+         readLock.lock();
+         try
+         {
+             validateForCompaction(txn.originals());
+             return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+         }
+         finally
+         {
+             readLock.unlock();
+         }
+ 
      }
  
-     private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+     private void validateForCompaction(Iterable<SSTableReader> input)
      {
-         SSTableReader firstSSTable = Iterables.getFirst(input, null);
-         assert firstSSTable != null;
-         boolean repaired = firstSSTable.isRepaired();
-         int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
-         boolean isPending = firstSSTable.isPendingRepair();
-         UUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair;
-         for (SSTableReader sstable : input)
-         {
-             if (sstable.isRepaired() != repaired)
-                 throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-             if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
-                 throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
-             if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair))
-                 throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions");
+         readLock.lock();
+         try
+         {
+             SSTableReader firstSSTable = Iterables.getFirst(input, null);
+             assert firstSSTable != null;
+             boolean repaired = firstSSTable.isRepaired();
+             int firstIndex = compactionStrategyIndexFor(firstSSTable);
++            boolean isPending = firstSSTable.isPendingRepair();
++            UUID pendingRepair = firstSSTable.getSSTableMetadata().pendingRepair;
+             for (SSTableReader sstable : input)
+             {
+                 if (sstable.isRepaired() != repaired)
+                     throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+                 if (firstIndex != compactionStrategyIndexFor(sstable))
+                     throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
++                if (isPending && !pendingRepair.equals(sstable.getSSTableMetadata().pendingRepair))
++                    throw new UnsupportedOperationException("You can't compact sstables from different pending repair sessions");
+             }
+         }
+         finally
+         {
+             readLock.unlock();
          }
 -
      }
  
      public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
@@@ -901,16 -868,13 +1007,16 @@@
          try
          {
              Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
 -                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
 +                                                                         .filter(s -> !s.isMarkedSuspect() && s.isRepaired() && !s.isPendingRepair())
-                                                                          .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                          .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
  
              Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
 -                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
 +                                                                           .filter(s -> !s.isMarkedSuspect() && !s.isRepaired() && !s.isPendingRepair())
-                                                                            .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                            .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
  
 +            Map<Integer, List<SSTableReader>> pendingSSTables = sstables.stream()
 +                                                                        .filter(s -> !s.isMarkedSuspect() && s.isPendingRepair())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
++                                                                        .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
  
              for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
                  ret.add(repaired.get(group.getKey()).getUserDefinedTask(group.getValue(), gcBefore));
@@@ -1005,19 -966,15 +1114,18 @@@
      {
          repaired.forEach(AbstractCompactionStrategy::shutdown);
          unrepaired.forEach(AbstractCompactionStrategy::shutdown);
 +        pendingRepairs.forEach(PendingRepairManager::shutdown);
          repaired.clear();
          unrepaired.clear();
 +        pendingRepairs.clear();
  
-         if (cfs.getPartitioner().splitter().isPresent())
+         if (partitionSSTablesByTokenRange)
          {
-             locations = cfs.getDirectories().getWriteableLocations();
-             for (int i = 0; i < locations.length; i++)
+             for (int i = 0; i < currentBoundaries.directories.size(); i++)
              {
 -                repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
 -                unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
 +                repaired.add(cfs.createCompactionStrategyInstance(params));
 +                unrepaired.add(cfs.createCompactionStrategyInstance(params));
 +                pendingRepairs.add(new PendingRepairManager(cfs, params));
              }
          }
          else
@@@ -1051,16 -1007,14 +1160,14 @@@
          readLock.lock();
          try
          {
 -            if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 -            {
 -                return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
 -            }
 +            // to avoid creating a compaction strategy for the wrong pending repair manager, we get the index based on where the sstable is to be written
-             int index = cfs.getPartitioner().splitter().isPresent()
-                         ? getCompactionStrategyIndex(Arrays.asList(getDirectories().getWriteableLocations()), descriptor)
-                         : 0;
++            int index = partitionSSTablesByTokenRange? currentBoundaries.getBoundariesFromSSTableDirectory(descriptor) : 0;
 +            if (pendingRepair != ActiveRepairService.NO_PENDING_REPAIR)
 +                return pendingRepairs.get(index).getOrCreate(pendingRepair).createSSTableMultiWriter(descriptor, keyCount, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, collector, header, indexes, txn);
 +            else if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
 +                return unrepaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
              else
 -            {
 -                return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
 -            }
 +                return repaired.get(index).createSSTableMultiWriter(descriptor, keyCount, repairedAt, ActiveRepairService.NO_PENDING_REPAIR, collector, header, indexes, txn);
          }
          finally
          {
@@@ -1075,8 -1029,8 +1182,8 @@@
  
      public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
      {
 -        List<Directories.DataDirectory> locations = currentBoundaries.directories;
 +        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-         if (cfs.getPartitioner().splitter().isPresent())
+         if (partitionSSTablesByTokenRange)
          {
              int unrepairedIndex = unrepaired.indexOf(strategy);
              if (unrepairedIndex > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 7219595,4635824..1ef007c
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -99,9 -98,9 +99,9 @@@ public class Scrubber implements Closea
  
          List<SSTableReader> toScrub = Collections.singletonList(sstable);
  
-         int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+         int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
          this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
 -        this.isCommutative = cfs.metadata.isCounter();
 +        this.isCommutative = cfs.metadata().isCounter();
  
          boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
          this.isIndex = cfs.isIndex();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index d8309b4,3dbf3d8..51219e6
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -317,21 -331,13 +317,8 @@@ public class CassandraDaemo
          // Re-populate token metadata after commit log recover (new peers might be loaded onto system keyspace #10293)
          StorageService.instance.populateTokenMetadata();
  
-         // enable auto compaction
-         for (Keyspace keyspace : Keyspace.all())
-         {
-             for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-             {
-                 for (final ColumnFamilyStore store : cfs.concatWithIndexes())
-                 {
-                     if (store.getCompactionStrategyManager().shouldBeEnabled())
-                         store.enableAutoCompaction();
-                 }
-             }
-         }
 -        // migrate any legacy (pre-3.0) hints from system.hints table into the new store
 -        new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
 -
 -        // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
 -        LegacyBatchlogMigrator.migrate();
--
          SystemKeyspace.finishStartup();
 +        ActiveRepairService.instance.start();
  
          // Prepared statements
          QueryProcessor.preloadPreparedStatement();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
index fc7c9a4,de79959..13454d5
--- a/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/DiskBoundaryManagerTest.java
@@@ -61,7 -61,7 +61,7 @@@ public class DiskBoundaryManagerTest ex
      @Test
      public void getBoundariesTest()
      {
--        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
++        DiskBoundaries dbv = mock.getDiskBoundaries();
          Assert.assertEquals(3, dbv.positions.size());
          assertEquals(dbv.directories, dirs.getWriteableLocations());
      }
@@@ -69,11 -69,11 +69,11 @@@
      @Test
      public void blackListTest()
      {
--        DiskBoundaries dbv = dbm.getDiskBoundaries(mock);
++        DiskBoundaries dbv = mock.getDiskBoundaries();
          Assert.assertEquals(3, dbv.positions.size());
          assertEquals(dbv.directories, dirs.getWriteableLocations());
          BlacklistedDirectories.maybeMarkUnwritable(new File("/tmp/3"));
--        dbv = dbm.getDiskBoundaries(mock);
++        dbv = mock.getDiskBoundaries();
          Assert.assertEquals(2, dbv.positions.size());
          Assert.assertEquals(Lists.newArrayList(new Directories.DataDirectory(new File("/tmp/1")),
                                          new Directories.DataDirectory(new File("/tmp/2"))),
@@@ -83,9 -83,9 +83,9 @@@
      @Test
      public void updateTokensTest() throws UnknownHostException
      {
--        DiskBoundaries dbv1 = dbm.getDiskBoundaries(mock);
++        DiskBoundaries dbv1 = mock.getDiskBoundaries();
          StorageService.instance.getTokenMetadata().updateNormalTokens(BootStrapper.getRandomTokens(StorageService.instance.getTokenMetadata(), 10), InetAddress.getByName("127.0.0.10"));
--        DiskBoundaries dbv2 = dbm.getDiskBoundaries(mock);
++        DiskBoundaries dbv2 = mock.getDiskBoundaries();
          assertFalse(dbv1.equals(dbv2));
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29a0d1f8/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
index 0000000,c654fcd..c315fb9
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@@ -1,0 -1,290 +1,309 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.io.File;
++import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.List;
++import java.util.Set;
+ import java.util.stream.Collectors;
+ 
++import com.google.common.collect.Sets;
+ import com.google.common.io.Files;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.Directories;
+ import org.apache.cassandra.db.DiskBoundaries;
+ import org.apache.cassandra.db.DiskBoundaryManager;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.PartitionPosition;
+ import org.apache.cassandra.db.RowUpdateBuilder;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.notifications.SSTableAddedNotification;
+ import org.apache.cassandra.notifications.SSTableDeletingNotification;
+ import org.apache.cassandra.schema.CompactionParams;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.UUIDGen;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class CompactionStrategyManagerTest
+ {
+     private static final String KS_PREFIX = "Keyspace1";
+     private static final String TABLE_PREFIX = "CF_STANDARD";
+ 
+     private static IPartitioner originalPartitioner;
+     private static boolean backups;
+ 
+     @BeforeClass
+     public static void beforeClass()
+     {
+         SchemaLoader.prepareServer();
+         backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+         DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+         /**
+          * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+          * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+          */
+         originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+     }
+ 
+     @AfterClass
+     public static void afterClass()
+     {
+         DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+     }
+ 
+     @Test
 -    public void testSSTablesAssignedToCorrectCompactionStrategy()
++    public void testSSTablesAssignedToCorrectCompactionStrategy() throws IOException
+     {
+         // Creates 100 SSTables with keys 0-99
+         int numSSTables = 100;
+         SchemaLoader.createKeyspace(KS_PREFIX,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+                                                 .compaction(CompactionParams.scts(Collections.emptyMap())));
+         ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+         cfs.disableAutoCompaction();
++        Set<SSTableReader> previousSSTables = cfs.getLiveSSTables();
+         for (int i = 0; i < numSSTables; i++)
+         {
+             createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
++            Set<SSTableReader> currentSSTables = cfs.getLiveSSTables();
++            Set<SSTableReader> newSSTables = Sets.difference(currentSSTables, previousSSTables);
++            assertEquals(1, newSSTables.size());
++            if (i % 3 == 0)
++            {
++                //make 1 third of sstables repaired
++                cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null);
++            }
++            else if (i % 3 == 1)
++            {
++                //make 1 third of sstables pending repair
++                cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, UUIDGen.getTimeUUID());
++            }
++            previousSSTables = currentSSTables;
+         }
+ 
+         // Creates a CompactionStrategymanager with different numbers of disks and check
+         // if the SSTables are assigned to the correct compaction strategies
+         for (int numDisks = 2; numDisks < 10; numDisks++)
+         {
+             testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+         }
+     }
+ 
+     public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+     {
+         // Create a mock CFS with the given number of disks
+         MockCFS cfs = createJBODMockCFS(numDisks);
+         //Check that CFS will contain numSSTables
+         assertEquals(numSSTables, cfs.getLiveSSTables().size());
+ 
+         // Creates a compaction strategy manager with an external boundary supplier
+         final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+ 
+         MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+         System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+         CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+                                                                       true);
+ 
+         // Check that SSTables are assigned to the correct Compaction Strategy
+         for (SSTableReader reader : cfs.getLiveSSTables())
+         {
+             verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+         }
+ 
+         for (int delta = 1; delta <= 3; delta++)
+         {
+             // Update disk boundaries
+             Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+             updateBoundaries(mockBoundaryManager, boundaries, delta);
+ 
+             // Check that SSTables are still assigned to the previous boundary layout
+             System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+             for (SSTableReader reader : cfs.getLiveSSTables())
+             {
+                 verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+             }
+ 
+             // Reload CompactionStrategyManager so new disk boundaries will be loaded
+             csm.maybeReloadDiskBoundaries();
+ 
+             for (SSTableReader reader : cfs.getLiveSSTables())
+             {
+                 // Check that SSTables are assigned to the new boundary layout
+                 verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+ 
+                 // Remove SSTable and check that it will be removed from the correct compaction strategy
+                 csm.handleNotification(new SSTableDeletingNotification(reader), this);
+                 assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+ 
+                 // Add SSTable again and check that is correctly assigned
 -                csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
++                csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader), null), this);
+                 verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+             }
+         }
+     }
+ 
+     private MockCFS createJBODMockCFS(int disks)
+     {
+         // Create #disks data directories to simulate JBOD
+         Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+         for (int i = 0; i < disks; ++i)
+         {
+             File tempDir = Files.createTempDir();
+             tempDir.deleteOnExit();
+             directories[i] = new Directories.DataDirectory(tempDir);
+         }
+ 
+         ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
 -        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
++        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata(), directories));
+         mockCFS.disableAutoCompaction();
+         mockCFS.addSSTables(cfs.getLiveSSTables());
+         return mockCFS;
+     }
+ 
+     /**
+      * Updates the boundaries with a delta
+      */
+     private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+     {
+         for (int j = 0; j < boundaries.length - 1; j++)
+         {
+             if ((j + delta) % 2 == 0)
+                 boundaries[j] -= delta;
+             else
+                 boundaries[j] += delta;
+         }
+         boundaryManager.invalidateBoundaries();
+     }
+ 
+     private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+     {
+         // Check that sstable is assigned to correct disk
+         int index = getSSTableIndex(boundaries, reader);
+         assertEquals(index, csm.compactionStrategyIndexFor(reader));
+         // Check that compaction strategy actually contains SSTable
+         assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+     }
+ 
+     /**
+      * Creates disk boundaries such that each disk receives
+      * an equal amount of SSTables
+      */
+     private Integer[] computeBoundaries(int numSSTables, int numDisks)
+     {
+         Integer[] result = new Integer[numDisks];
+         int sstablesPerRange = numSSTables / numDisks;
+         result[0] = sstablesPerRange;
+         for (int i = 1; i < numDisks; i++)
+         {
+             result[i] = result[i - 1] + sstablesPerRange;
+         }
+         result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+         return result;
+     }
+ 
+     /**
+      * Since each SSTable contains keys from 0-99, and each sstable
+      * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+      * we can compute the sstable position in the disk boundaries by finding
+      * the generation position relative to the boundaries
+      */
+     private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+     {
+         int index = 0;
+         while (boundaries[index] < reader.descriptor.generation)
+             index++;
+         System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+         return index;
+     }
+ 
+ 
+ 
+     class MockBoundaryManager
+     {
+         private final ColumnFamilyStore cfs;
+         private Integer[] positions;
+         private DiskBoundaries boundaries;
+ 
+         public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+         {
+             this.cfs = cfs;
+             this.positions = positions;
+             this.boundaries = createDiskBoundaries(cfs, positions);
+         }
+ 
+         public void invalidateBoundaries()
+         {
+             boundaries.invalidate();
+         }
+ 
+         public DiskBoundaries getBoundaries()
+         {
+             if (boundaries.isOutOfDate())
+                 boundaries = createDiskBoundaries(cfs, positions);
+             return boundaries;
+         }
+ 
+         private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+         {
+             List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+             return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+         }
+     }
+ 
+     private static void createSSTableWithKey(String keyspace, String table, int key)
+     {
+         long timestamp = System.currentTimeMillis();
+         DecoratedKey dk = Util.dk(String.format("%04d", key));
+         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
 -        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
++        new RowUpdateBuilder(cfs.metadata(), timestamp, dk.getKey())
+         .clustering(Integer.toString(key))
+         .add("val", "val")
+         .build()
+         .applyUnsafe();
+         cfs.forceBlockingFlush();
+     }
+ 
+     // just to be able to override the data directories
+     private static class MockCFS extends ColumnFamilyStore
+     {
+         MockCFS(ColumnFamilyStore cfs, Directories dirs)
+         {
+             super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org