You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/12/08 18:34:45 UTC

[2/3] cassandra git commit: Reload compaction strategies when disk boundaries are invalidated

Reload compaction strategies when disk boundaries are invalidated

Patch by Paulo Motta; Reviewed by Marcus Eriksson for CASSANDRA-13948


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/25e46f05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/25e46f05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/25e46f05

Branch: refs/heads/trunk
Commit: 25e46f05294fd42c111f2f1d5881082d97c572ea
Parents: b637eb1
Author: Paulo Motta <pa...@gmail.com>
Authored: Thu Nov 30 23:14:34 2017 +1100
Committer: Paulo Motta <pa...@apache.org>
Committed: Sat Dec 9 05:30:01 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/Directories.java    |   7 +
 .../org/apache/cassandra/db/DiskBoundaries.java |  62 ++-
 .../cassandra/db/DiskBoundaryManager.java       |  32 +-
 .../db/compaction/CompactionManager.java        |   4 +-
 .../compaction/CompactionStrategyManager.java   | 406 ++++++++++++-------
 .../cassandra/db/compaction/Scrubber.java       |   4 +-
 .../SizeTieredCompactionStrategy.java           |   3 +-
 .../cassandra/service/CassandraDaemon.java      |  32 +-
 .../cassandra/service/StorageService.java       |  17 +
 .../CompactionStrategyManagerTest.java          | 290 +++++++++++++
 .../db/compaction/CompactionsCQLTest.java       |  13 +-
 12 files changed, 676 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 18a22bd..b7a6e14 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
  * Remove OpenJDK log warning (CASSANDRA-13916)
  * Prevent compaction strategies from looping indefinitely (CASSANDRA-14079)
  * Cache disk boundaries (CASSANDRA-13215)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 5e52b0f..532bf98 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -579,6 +579,13 @@ public class Directories
         {
             return location.hashCode();
         }
+
+        public String toString()
+        {
+            return "DataDirectory{" +
+                   "location=" + location +
+                   '}';
+        }
     }
 
     static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaries.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index ba5a093..7bfed28 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -18,18 +18,30 @@
 
 package org.apache.cassandra.db;
 
+import java.util.Collections;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+
 public class DiskBoundaries
 {
     public final List<Directories.DataDirectory> directories;
     public final ImmutableList<PartitionPosition> positions;
     final long ringVersion;
     final int directoriesVersion;
+    private volatile boolean isInvalid = false;
 
-    DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
+    public DiskBoundaries(Directories.DataDirectory[] directories, int diskVersion)
+    {
+        this(directories, null, -1, diskVersion);
+    }
+
+    @VisibleForTesting
+    public DiskBoundaries(Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion)
     {
         this.directories = directories == null ? null : ImmutableList.copyOf(directories);
         this.positions = positions == null ? null : ImmutableList.copyOf(positions);
@@ -68,4 +80,52 @@ public class DiskBoundaries
                ", directoriesVersion=" + directoriesVersion +
                '}';
     }
+
+    /**
+     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
+     */
+    public boolean isOutOfDate()
+    {
+        if (isInvalid)
+            return true;
+        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
+        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
+        return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion);
+    }
+
+    public void invalidate()
+    {
+        this.isInvalid = true;
+    }
+
+    public int getDiskIndex(SSTableReader sstable)
+    {
+        if (positions == null)
+        {
+            return getBoundariesFromSSTableDirectory(sstable);
+        }
+
+        int pos = Collections.binarySearch(positions, sstable.first);
+        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+        return -pos - 1;
+    }
+
+    /**
+     * Try to figure out location based on sstable directory
+     */
+    private int getBoundariesFromSSTableDirectory(SSTableReader sstable)
+    {
+        for (int i = 0; i < directories.size(); i++)
+        {
+            Directories.DataDirectory directory = directories.get(i);
+            if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                return i;
+        }
+        return 0;
+    }
+
+    public Directories.DataDirectory getCorrectDiskForSSTable(SSTableReader sstable)
+    {
+        return directories.get(getDiskIndex(sstable));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 7872554..14d3983 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -42,38 +42,27 @@ public class DiskBoundaryManager
     public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
     {
         if (!cfs.getPartitioner().splitter().isPresent())
-            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), null, -1, -1);
-        // copy the reference to avoid getting nulled out by invalidate() below
-        // - it is ok to race, compaction will move any incorrect tokens to their correct places, but
-        // returning null would be bad
-        DiskBoundaries db = diskBoundaries;
-        if (isOutOfDate(diskBoundaries))
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), BlacklistedDirectories.getDirectoriesVersion());
+        if (diskBoundaries == null || diskBoundaries.isOutOfDate())
         {
             synchronized (this)
             {
-                db = diskBoundaries;
-                if (isOutOfDate(diskBoundaries))
+                if (diskBoundaries == null || diskBoundaries.isOutOfDate())
                 {
                     logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
                     DiskBoundaries oldBoundaries = diskBoundaries;
-                    db = diskBoundaries = getDiskBoundaryValue(cfs);
+                    diskBoundaries = getDiskBoundaryValue(cfs);
                     logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName());
                 }
             }
         }
-        return db;
+        return diskBoundaries;
     }
 
-    /**
-     * check if the given disk boundaries are out of date due not being set or to having too old diskVersion/ringVersion
-     */
-    private boolean isOutOfDate(DiskBoundaries db)
+    public void invalidate()
     {
-        if (db == null)
-            return true;
-        long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion();
-        int currentDiskVersion = BlacklistedDirectories.getDirectoriesVersion();
-        return currentRingVersion != db.ringVersion || currentDiskVersion != db.directoriesVersion;
+       if (diskBoundaries != null)
+           diskBoundaries.invalidate();
     }
 
     private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
@@ -145,9 +134,4 @@ public class DiskBoundaryManager
         diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
         return diskBoundaries;
     }
-
-    public void invalidate()
-    {
-        diskBoundaries = null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 3351736..4030384 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -525,7 +525,7 @@ public class CompactionManager implements CompactionManagerMBean
                 transaction.cancel(Sets.difference(originals, needsRelocation));
 
                 Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
-                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, s)));
+                                                                                                                         cfs.getCompactionStrategyManager().getCompactionStrategyIndex(s)));
 
                 int maxSize = 0;
                 for (List<SSTableReader> diskSSTables : groupedByDisk.values())
@@ -545,7 +545,7 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 if (!cfs.getPartitioner().splitter().isPresent())
                     return true;
-                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+                int directoryIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
                 Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
 
                 Directories.DataDirectory location = locations[directoryIndex];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 6305096..4103433 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,12 +21,15 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.index.Index;
 import com.google.common.primitives.Ints;
 
@@ -36,7 +39,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
@@ -56,45 +58,73 @@ import org.apache.cassandra.service.ActiveRepairService;
  *
  * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
  * unrepaired data. This is done to be able to totally separate the different sets of sstables.
+ *
+ * Operations on this class are guarded by a {@link ReentrantReadWriteLock}. This lock performs mutual exclusion on
+ * reads and writes to the following variables: {@link this#repaired}, {@link this#unrepaired}, {@link this#isActive},
+ * {@link this#params}, {@link this#currentBoundaries}. Whenever performing reads on these variables,
+ * the {@link this#readLock} should be acquired. Likewise, updates to these variables should be guarded by
+ * {@link this#writeLock}.
+ *
+ * Whenever the {@link DiskBoundaries} change, the compaction strategies must be reloaded, so in order to ensure
+ * the compaction strategy placement reflect most up-to-date disk boundaries, call {@link this#maybeReloadDiskBoundaries()}
+ * before acquiring the read lock to acess the strategies.
+ *
  */
-
 public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
     public final CompactionLogger compactionLogger;
     private final ColumnFamilyStore cfs;
+    private final boolean partitionSSTablesByTokenRange;
+    private final Supplier<DiskBoundaries> boundariesSupplier;
+
+    /**
+     * Performs mutual exclusion on the variables below
+     */
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+    /**
+     * Variables guarded by read and write lock above
+     */
+    //TODO check possibility of getting rid of these locks by encapsulating these in an immutable atomic object
     private final List<AbstractCompactionStrategy> repaired = new ArrayList<>();
     private final List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
+    private volatile CompactionParams params;
+    private DiskBoundaries currentBoundaries;
     private volatile boolean enabled = true;
     private volatile boolean isActive = true;
-    private volatile CompactionParams params;
-    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
-    private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
-    /*
+    /**
         We keep a copy of the schema compaction parameters here to be able to decide if we
-        should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER.
+        should update the compaction strategy in {@link this#maybeReload(CFMetaData)} due to an ALTER.
 
         If a user changes the local compaction strategy and then later ALTERs a compaction parameter,
         we will use the new compaction parameters.
-     */
+     **/
     private volatile CompactionParams schemaCompactionParams;
-    private Directories.DataDirectory[] locations;
     private boolean shouldDefragment;
     private int fanout;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
+        this(cfs, cfs::getDiskBoundaries, cfs.getPartitioner().splitter().isPresent());
+    }
+
+    @VisibleForTesting
+    public CompactionStrategyManager(ColumnFamilyStore cfs, Supplier<DiskBoundaries> boundariesSupplier,
+                                     boolean partitionSSTablesByTokenRange)
+    {
         cfs.getTracker().subscribe(this);
         logger.trace("{} subscribed to the data tracker.", this);
         this.cfs = cfs;
         this.compactionLogger = new CompactionLogger(cfs, this);
-        reload(cfs.metadata);
+        this.boundariesSupplier = boundariesSupplier;
+        this.partitionSSTablesByTokenRange = partitionSSTablesByTokenRange;
         params = cfs.metadata.params.compaction;
-        locations = getDirectories().getWriteableLocations();
         enabled = params.isEnabled();
-
+        reload(cfs.metadata.params.compaction);
     }
 
     /**
@@ -105,13 +135,13 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
             if (!isEnabled())
                 return null;
 
-            maybeReload(cfs.metadata);
             List<AbstractCompactionStrategy> strategies = new ArrayList<>();
 
             strategies.addAll(repaired);
@@ -181,7 +211,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
             {
                 if (sstable.openReason != SSTableReader.OpenReason.EARLY)
-                    getCompactionStrategyFor(sstable).addSSTable(sstable);
+                    compactionStrategyFor(sstable).addSSTable(sstable);
             }
             repaired.forEach(AbstractCompactionStrategy::startup);
             unrepaired.forEach(AbstractCompactionStrategy::startup);
@@ -205,12 +235,20 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @param sstable
      * @return
      */
-    public AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    protected AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
+    {
+        maybeReloadDiskBoundaries();
+        return compactionStrategyFor(sstable);
+    }
+
+    @VisibleForTesting
+    protected AbstractCompactionStrategy compactionStrategyFor(SSTableReader sstable)
     {
-        int index = getCompactionStrategyIndex(cfs, sstable);
+        // should not call maybeReloadDiskBoundaries because it may be called from within lock
         readLock.lock();
         try
         {
+            int index = compactionStrategyIndexFor(sstable);
             if (sstable.isRepaired())
                 return repaired.get(index);
             else
@@ -230,33 +268,33 @@ public class CompactionStrategyManager implements INotificationConsumer
      * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
      * sstables in the correct locations and give them to the correct compaction strategy instance.
      *
-     * @param cfs
      * @param sstable
      * @return
      */
-    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, SSTableReader sstable)
+    public int getCompactionStrategyIndex(SSTableReader sstable)
     {
-        if (!cfs.getPartitioner().splitter().isPresent())
-            return 0;
+        maybeReloadDiskBoundaries();
+        return compactionStrategyIndexFor(sstable);
+    }
 
-        DiskBoundaries boundaries = cfs.getDiskBoundaries();
-        List<Directories.DataDirectory> directories = boundaries.directories;
+    @VisibleForTesting
+    protected int compactionStrategyIndexFor(SSTableReader sstable)
+    {
+        // should not call maybeReload because it may be called from within lock
+        readLock.lock();
+        try
+        {
+            //We only have a single compaction strategy when sstables are not
+            //partitioned by token range
+            if (!partitionSSTablesByTokenRange)
+                return 0;
 
-        if (boundaries.positions == null)
+            return currentBoundaries.getDiskIndex(sstable);
+        }
+        finally
         {
-            // try to figure out location based on sstable directory:
-            for (int i = 0; i < directories.size(); i++)
-            {
-                Directories.DataDirectory directory = directories.get(i);
-                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
-                    return i;
-            }
-            return 0;
+            readLock.unlock();
         }
-
-        int pos = Collections.binarySearch(boundaries.positions, sstable.first);
-        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
-        return -pos - 1;
     }
 
     public void shutdown()
@@ -278,14 +316,48 @@ public class CompactionStrategyManager implements INotificationConsumer
     public void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
-        if (metadata.params.compaction.equals(schemaCompactionParams) &&
-            Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
+        if (metadata.params.compaction.equals(schemaCompactionParams))
             return;
 
         writeLock.lock();
         try
         {
-            reload(metadata);
+            // compare the old schema configuration to the new one, ignore any locally set changes.
+            if (metadata.params.compaction.equals(schemaCompactionParams))
+                return;
+            reload(metadata.params.compaction);
+        }
+        finally
+        {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Checks if the disk boundaries changed and reloads the compaction strategies
+     * to reflect the most up-to-date disk boundaries.
+     *
+     * This is typically called before acquiring the {@link this#readLock} to ensure the most up-to-date
+     * disk locations and boundaries are used.
+     *
+     * This should *never* be called inside by a thread holding the {@link this#readLock}, since it
+     * will potentially acquire the {@link this#writeLock} to update the compaction strategies
+     * what can cause a deadlock.
+     */
+    //TODO improve this to reload after receiving a notification rather than trying to reload on every operation
+    @VisibleForTesting
+    protected boolean maybeReloadDiskBoundaries()
+    {
+        if (!currentBoundaries.isOutOfDate())
+            return false;
+
+        writeLock.lock();
+        try
+        {
+            if (!currentBoundaries.isOutOfDate())
+                return false;
+            reload(params);
+            return true;
         }
         finally
         {
@@ -297,20 +369,28 @@ public class CompactionStrategyManager implements INotificationConsumer
      * Reload the compaction strategies
      *
      * Called after changing configuration and at startup.
-     * @param metadata
+     * @param newCompactionParams
      */
-    private void reload(CFMetaData metadata)
+    private void reload(CompactionParams newCompactionParams)
     {
+        boolean enabledWithJMX = enabled && !shouldBeEnabled();
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
-        if (!metadata.params.compaction.equals(schemaCompactionParams))
-            logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
-        else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
-            logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
 
-        setStrategy(metadata.params.compaction);
-        schemaCompactionParams = metadata.params.compaction;
+        if (currentBoundaries != null)
+        {
+            if (!newCompactionParams.equals(schemaCompactionParams))
+                logger.debug("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+            else if (currentBoundaries.isOutOfDate())
+                logger.debug("Recreating compaction strategy - disk boundaries are out of date for {}.{}.", cfs.keyspace.getName(), cfs.getTableName());
+        }
+
+        if (currentBoundaries == null || currentBoundaries.isOutOfDate())
+            currentBoundaries = boundariesSupplier.get();
+
+        setStrategy(newCompactionParams);
+        schemaCompactionParams = cfs.metadata.params.compaction;
 
-        if (disabledWithJMX || !shouldBeEnabled())
+        if (disabledWithJMX || !shouldBeEnabled() && !enabledWithJMX)
             disable();
         else
             enable();
@@ -326,6 +406,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int getUnleveledSSTables()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -353,6 +434,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int[] getSSTableCountPerLevel()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -401,6 +483,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Directories getDirectories()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -415,11 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void handleFlushNotification(Iterable<SSTableReader> added)
     {
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
+
         readLock.lock();
         try
         {
             for (SSTableReader sstable : added)
-                getCompactionStrategyFor(sstable).addSSTable(sstable);
+                compactionStrategyFor(sstable).addSSTable(sstable);
         }
         finally
         {
@@ -429,44 +517,47 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void handleListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
     {
-        // a bit of gymnastics to be able to replace sstables in compaction strategies
-        // we use this to know that a compaction finished and where to start the next compaction in LCS
-        Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
-        int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
-
-        List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
-        List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
-
-        for (int i = 0; i < locationSize; i++)
-        {
-            repairedRemoved.add(new HashSet<>());
-            repairedAdded.add(new HashSet<>());
-            unrepairedRemoved.add(new HashSet<>());
-            unrepairedAdded.add(new HashSet<>());
-        }
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
 
-        for (SSTableReader sstable : removed)
-        {
-            int i = getCompactionStrategyIndex(cfs, sstable);
-            if (sstable.isRepaired())
-                repairedRemoved.get(i).add(sstable);
-            else
-                unrepairedRemoved.get(i).add(sstable);
-        }
-        for (SSTableReader sstable : added)
-        {
-            int i = getCompactionStrategyIndex(cfs, sstable);
-            if (sstable.isRepaired())
-                repairedAdded.get(i).add(sstable);
-            else
-                unrepairedAdded.get(i).add(sstable);
-        }
-        // we need write lock here since we might be moving sstables between strategies
-        writeLock.lock();
+        readLock.lock();
         try
         {
+            // a bit of gymnastics to be able to replace sstables in compaction strategies
+            // we use this to know that a compaction finished and where to start the next compaction in LCS
+            int locationSize = partitionSSTablesByTokenRange? currentBoundaries.directories.size() : 1;
+
+            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+            for (int i = 0; i < locationSize; i++)
+            {
+                repairedRemoved.add(new HashSet<>());
+                repairedAdded.add(new HashSet<>());
+                unrepairedRemoved.add(new HashSet<>());
+                unrepairedAdded.add(new HashSet<>());
+            }
+
+            for (SSTableReader sstable : removed)
+            {
+                int i = compactionStrategyIndexFor(sstable);
+                if (sstable.isRepaired())
+                    repairedRemoved.get(i).add(sstable);
+                else
+                    unrepairedRemoved.get(i).add(sstable);
+            }
+            for (SSTableReader sstable : added)
+            {
+                int i = compactionStrategyIndexFor(sstable);
+                if (sstable.isRepaired())
+                    repairedAdded.get(i).add(sstable);
+                else
+                    unrepairedAdded.get(i).add(sstable);
+            }
             for (int i = 0; i < locationSize; i++)
             {
                 if (!repairedRemoved.get(i).isEmpty())
@@ -482,19 +573,23 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     private void handleRepairStatusChangedNotification(Iterable<SSTableReader> sstables)
     {
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
         // we need a write lock here since we move sstables from one strategy instance to another
-        writeLock.lock();
+        readLock.lock();
         try
         {
             for (SSTableReader sstable : sstables)
             {
-                int index = getCompactionStrategyIndex(cfs, sstable);
+                int index = compactionStrategyIndexFor(sstable);
                 if (sstable.isRepaired())
                 {
                     unrepaired.get(index).removeSSTable(sstable);
@@ -509,26 +604,29 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     private void handleDeletingNotification(SSTableReader deleted)
     {
-        writeLock.lock();
+        // If reloaded, SSTables will be placed in their correct locations
+        // so there is no need to process notification
+        if (maybeReloadDiskBoundaries())
+            return;
+        readLock.lock();
         try
         {
-            getCompactionStrategyFor(deleted).removeSSTable(deleted);
+            compactionStrategyFor(deleted).removeSSTable(deleted);
         }
         finally
         {
-            writeLock.unlock();
+            readLock.unlock();
         }
     }
 
     public void handleNotification(INotification notification, Object sender)
     {
-        maybeReload(cfs.metadata);
         if (notification instanceof SSTableAddedNotification)
         {
             handleFlushNotification(((SSTableAddedNotification) notification).added);
@@ -595,29 +693,29 @@ public class CompactionStrategyManager implements INotificationConsumer
     @SuppressWarnings("resource")
     public AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
-        assert repaired.size() == unrepaired.size();
-        List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
-        List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
-
-        for (int i = 0; i < repaired.size(); i++)
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
         {
-            repairedSSTables.add(new HashSet<>());
-            unrepairedSSTables.add(new HashSet<>());
-        }
+            assert repaired.size() == unrepaired.size();
+            List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+            List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
 
-        for (SSTableReader sstable : sstables)
-        {
-            if (sstable.isRepaired())
-                repairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
-            else
-                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, sstable)).add(sstable);
-        }
+            for (int i = 0; i < repaired.size(); i++)
+            {
+                repairedSSTables.add(new HashSet<>());
+                unrepairedSSTables.add(new HashSet<>());
+            }
 
-        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
+            for (SSTableReader sstable : sstables)
+            {
+                if (sstable.isRepaired())
+                    repairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+                else
+                    unrepairedSSTables.get(compactionStrategyIndexFor(sstable)).add(sstable);
+            }
 
-        readLock.lock();
-        try
-        {
+            List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
             for (int i = 0; i < repairedSSTables.size(); i++)
             {
                 if (!repairedSSTables.get(i).isEmpty())
@@ -644,10 +742,11 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
-            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+            Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
             Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
 
             for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
@@ -675,29 +774,47 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
-        maybeReload(cfs.metadata);
-        validateForCompaction(txn.originals(), cfs, getDirectories());
-        return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+        maybeReloadDiskBoundaries();
+        readLock.lock();
+        try
+        {
+            validateForCompaction(txn.originals());
+            return compactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+
     }
 
-    private static void validateForCompaction(Iterable<SSTableReader> input, ColumnFamilyStore cfs, Directories directories)
+    private void validateForCompaction(Iterable<SSTableReader> input)
     {
-        SSTableReader firstSSTable = Iterables.getFirst(input, null);
-        assert firstSSTable != null;
-        boolean repaired = firstSSTable.isRepaired();
-        int firstIndex = getCompactionStrategyIndex(cfs, firstSSTable);
-        for (SSTableReader sstable : input)
+        readLock.lock();
+        try
+        {
+            SSTableReader firstSSTable = Iterables.getFirst(input, null);
+            assert firstSSTable != null;
+            boolean repaired = firstSSTable.isRepaired();
+            int firstIndex = compactionStrategyIndexFor(firstSSTable);
+            for (SSTableReader sstable : input)
+            {
+                if (sstable.isRepaired() != repaired)
+                    throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+                if (firstIndex != compactionStrategyIndexFor(sstable))
+                    throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            }
+        }
+        finally
         {
-            if (sstable.isRepaired() != repaired)
-                throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
-            if (firstIndex != getCompactionStrategyIndex(cfs, sstable))
-                throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+            readLock.unlock();
         }
+
     }
 
     public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
     {
-        maybeReload(cfs.metadata);
+        maybeReloadDiskBoundaries();
         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
         // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
         // sstables are marked the compactions are re-enabled
@@ -745,18 +862,18 @@ public class CompactionStrategyManager implements INotificationConsumer
      */
     public List<AbstractCompactionTask> getUserDefinedTasks(Collection<SSTableReader> sstables, int gcBefore)
     {
-        maybeReload(cfs.metadata);
+        maybeReloadDiskBoundaries();
         List<AbstractCompactionTask> ret = new ArrayList<>();
         readLock.lock();
         try
         {
             Map<Integer, List<SSTableReader>> repairedSSTables = sstables.stream()
                                                                          .filter(s -> !s.isMarkedSuspect() && s.isRepaired())
-                                                                         .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                         .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 
             Map<Integer, List<SSTableReader>> unrepairedSSTables = sstables.stream()
                                                                            .filter(s -> !s.isMarkedSuspect() && !s.isRepaired())
-                                                                           .collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, s)));
+                                                                           .collect(Collectors.groupingBy((s) -> compactionStrategyIndexFor(s)));
 
 
             for (Map.Entry<Integer, List<SSTableReader>> group : repairedSSTables.entrySet())
@@ -773,20 +890,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         }
     }
 
-    /**
-     * @deprecated use {@link #getUserDefinedTasks(Collection, int)} instead.
-     */
-    @Deprecated()
-    public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
-    {
-        validateForCompaction(sstables, cfs, getDirectories());
-        List<AbstractCompactionTask> tasks = getUserDefinedTasks(sstables, gcBefore);
-        assert tasks.size() == 1;
-        return tasks.get(0);
-    }
-
     public int getEstimatedRemainingTasks()
     {
+        maybeReloadDiskBoundaries();
         int tasks = 0;
         readLock.lock();
         try
@@ -811,6 +917,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public String getName()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -824,6 +931,7 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public List<List<AbstractCompactionStrategy>> getStrategies()
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -861,10 +969,9 @@ public class CompactionStrategyManager implements INotificationConsumer
         repaired.clear();
         unrepaired.clear();
 
-        if (cfs.getPartitioner().splitter().isPresent())
+        if (partitionSSTablesByTokenRange)
         {
-            locations = cfs.getDirectories().getWriteableLocations();
-            for (int i = 0; i < locations.length; i++)
+            for (int i = 0; i < currentBoundaries.directories.size(); i++)
             {
                 repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
                 unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
@@ -896,6 +1003,7 @@ public class CompactionStrategyManager implements INotificationConsumer
                                                        Collection<Index> indexes,
                                                        LifecycleTransaction txn)
     {
+        maybeReloadDiskBoundaries();
         readLock.lock();
         try
         {
@@ -921,21 +1029,21 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public List<String> getStrategyFolders(AbstractCompactionStrategy strategy)
     {
-        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
-        if (cfs.getPartitioner().splitter().isPresent())
+        List<Directories.DataDirectory> locations = currentBoundaries.directories;
+        if (partitionSSTablesByTokenRange)
         {
             int unrepairedIndex = unrepaired.indexOf(strategy);
             if (unrepairedIndex > 0)
             {
-                return Collections.singletonList(locations[unrepairedIndex].location.getAbsolutePath());
+                return Collections.singletonList(locations.get(unrepairedIndex).location.getAbsolutePath());
             }
             int repairedIndex = repaired.indexOf(strategy);
             if (repairedIndex > 0)
             {
-                return Collections.singletonList(locations[repairedIndex].location.getAbsolutePath());
+                return Collections.singletonList(locations.get(repairedIndex).location.getAbsolutePath());
             }
         }
-        List<String> folders = new ArrayList<>(locations.length);
+        List<String> folders = new ArrayList<>(locations.size());
         for (Directories.DataDirectory location : locations)
         {
             folders.add(location.location.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index b1f2e9f..4635824 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -98,7 +98,7 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, sstable);
+        int locIndex = cfs.getCompactionStrategyManager().getCompactionStrategyIndex(sstable);
         this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 
@@ -508,7 +508,7 @@ public class Scrubber implements Closeable
                         nextToOffer = peek; // Offer peek in next call
                         return next;
                     }
-    
+
                     // Duplicate row, merge it.
                     next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds());
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0dd134a..96b733e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -65,7 +65,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
     protected volatile int estimatedRemainingTasks;
-    private final Set<SSTableReader> sstables = new HashSet<>();
+    @VisibleForTesting
+    protected final Set<SSTableReader> sstables = new HashSet<>();
 
     public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 820b016..3dbf3d8 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -288,7 +288,7 @@ public class CassandraDaemon
         {
             if (logger.isDebugEnabled())
                 logger.debug("opening keyspace {}", keyspaceName);
-            // disable auto compaction until commit log replay ends
+            // disable auto compaction until gossip settles since disk boundaries may be affected by ring layout
             for (ColumnFamilyStore cfs : Keyspace.open(keyspaceName).getColumnFamilyStores())
             {
                 for (ColumnFamilyStore store : cfs.concatWithIndexes())
@@ -298,7 +298,6 @@ public class CassandraDaemon
             }
         }
 
-
         try
         {
             loadRowAndKeyCacheAsync().get();
@@ -338,19 +337,6 @@ public class CassandraDaemon
         // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
         LegacyBatchlogMigrator.migrate();
 
-        // enable auto compaction
-        for (Keyspace keyspace : Keyspace.all())
-        {
-            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-            {
-                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
-                {
-                    if (store.getCompactionStrategyManager().shouldBeEnabled())
-                        store.enableAutoCompaction();
-                }
-            }
-        }
-
         SystemKeyspace.finishStartup();
 
         // Prepared statements
@@ -413,6 +399,22 @@ public class CassandraDaemon
         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
             Gossiper.waitToSettle();
 
+        // re-enable auto-compaction after gossip is settled, so correct disk boundaries are used
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.reload(); //reload CFs in case there was a change of disk boundaries
+                    if (store.getCompactionStrategyManager().shouldBeEnabled())
+                    {
+                        store.enableAutoCompaction();
+                    }
+                }
+            }
+        }
+
         // schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
         // due to scheduling errors or race conditions
         ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e93430b..fafe8e8 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1496,6 +1496,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             SystemKeyspace.resetAvailableRanges();
         }
 
+        // Force disk boundary invalidation now that local tokens are set
+        invalidateDiskBoundaries();
+
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
         BootStrapper bootstrapper = new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata);
         bootstrapper.addProgressListener(progressSupport);
@@ -1527,6 +1530,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    private void invalidateDiskBoundaries()
+    {
+        for (Keyspace keyspace : Keyspace.all())
+        {
+            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+            {
+                for (final ColumnFamilyStore store : cfs.concatWithIndexes())
+                {
+                    store.invalidateDiskBoundaries();
+                }
+            }
+        }
+    }
+
     /**
      * All MVs have been created during bootstrap, so mark them as built
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
new file mode 100644
index 0000000..c654fcd
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.DiskBoundaries;
+import org.apache.cassandra.db.DiskBoundaryManager;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CompactionStrategyManagerTest
+{
+    private static final String KS_PREFIX = "Keyspace1";
+    private static final String TABLE_PREFIX = "CF_STANDARD";
+
+    private static IPartitioner originalPartitioner;
+    private static boolean backups;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        SchemaLoader.prepareServer();
+        backups = DatabaseDescriptor.isIncrementalBackupsEnabled();
+        DatabaseDescriptor.setIncrementalBackupsEnabled(false);
+        /**
+         * We use byte ordered partitioner in this test to be able to easily infer an SSTable
+         * disk assignment based on its generation - See {@link this#getSSTableIndex(Integer[], SSTableReader)}
+         */
+        originalPartitioner = StorageService.instance.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        DatabaseDescriptor.setPartitionerUnsafe(originalPartitioner);
+        DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
+    }
+
+    @Test
+    public void testSSTablesAssignedToCorrectCompactionStrategy()
+    {
+        // Creates 100 SSTables with keys 0-99
+        int numSSTables = 100;
+        SchemaLoader.createKeyspace(KS_PREFIX,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KS_PREFIX, TABLE_PREFIX)
+                                                .compaction(CompactionParams.scts(Collections.emptyMap())));
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        cfs.disableAutoCompaction();
+        for (int i = 0; i < numSSTables; i++)
+        {
+            createSSTableWithKey(KS_PREFIX, TABLE_PREFIX, i);
+        }
+
+        // Creates a CompactionStrategymanager with different numbers of disks and check
+        // if the SSTables are assigned to the correct compaction strategies
+        for (int numDisks = 2; numDisks < 10; numDisks++)
+        {
+            testSSTablesAssignedToCorrectCompactionStrategy(numSSTables, numDisks);
+        }
+    }
+
+    public void testSSTablesAssignedToCorrectCompactionStrategy(int numSSTables, int numDisks)
+    {
+        // Create a mock CFS with the given number of disks
+        MockCFS cfs = createJBODMockCFS(numDisks);
+        //Check that CFS will contain numSSTables
+        assertEquals(numSSTables, cfs.getLiveSSTables().size());
+
+        // Creates a compaction strategy manager with an external boundary supplier
+        final Integer[] boundaries = computeBoundaries(numSSTables, numDisks);
+
+        MockBoundaryManager mockBoundaryManager = new MockBoundaryManager(cfs, boundaries);
+        System.out.println("Boundaries for " + numDisks + " disks is " + Arrays.toString(boundaries));
+        CompactionStrategyManager csm = new CompactionStrategyManager(cfs, mockBoundaryManager::getBoundaries,
+                                                                      true);
+
+        // Check that SSTables are assigned to the correct Compaction Strategy
+        for (SSTableReader reader : cfs.getLiveSSTables())
+        {
+            verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+        }
+
+        for (int delta = 1; delta <= 3; delta++)
+        {
+            // Update disk boundaries
+            Integer[] previousBoundaries = Arrays.copyOf(boundaries, boundaries.length);
+            updateBoundaries(mockBoundaryManager, boundaries, delta);
+
+            // Check that SSTables are still assigned to the previous boundary layout
+            System.out.println("Old boundaries: " + Arrays.toString(previousBoundaries) + " New boundaries: " + Arrays.toString(boundaries));
+            for (SSTableReader reader : cfs.getLiveSSTables())
+            {
+                verifySSTableIsAssignedToCorrectStrategy(previousBoundaries, csm, reader);
+            }
+
+            // Reload CompactionStrategyManager so new disk boundaries will be loaded
+            csm.maybeReloadDiskBoundaries();
+
+            for (SSTableReader reader : cfs.getLiveSSTables())
+            {
+                // Check that SSTables are assigned to the new boundary layout
+                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+
+                // Remove SSTable and check that it will be removed from the correct compaction strategy
+                csm.handleNotification(new SSTableDeletingNotification(reader), this);
+                assertFalse(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+
+                // Add SSTable again and check that is correctly assigned
+                csm.handleNotification(new SSTableAddedNotification(Collections.singleton(reader)), this);
+                verifySSTableIsAssignedToCorrectStrategy(boundaries, csm, reader);
+            }
+        }
+    }
+
+    private MockCFS createJBODMockCFS(int disks)
+    {
+        // Create #disks data directories to simulate JBOD
+        Directories.DataDirectory[] directories = new Directories.DataDirectory[disks];
+        for (int i = 0; i < disks; ++i)
+        {
+            File tempDir = Files.createTempDir();
+            tempDir.deleteOnExit();
+            directories[i] = new Directories.DataDirectory(tempDir);
+        }
+
+        ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
+        MockCFS mockCFS = new MockCFS(cfs, new Directories(cfs.metadata, directories));
+        mockCFS.disableAutoCompaction();
+        mockCFS.addSSTables(cfs.getLiveSSTables());
+        return mockCFS;
+    }
+
+    /**
+     * Updates the boundaries with a delta
+     */
+    private void updateBoundaries(MockBoundaryManager boundaryManager, Integer[] boundaries, int delta)
+    {
+        for (int j = 0; j < boundaries.length - 1; j++)
+        {
+            if ((j + delta) % 2 == 0)
+                boundaries[j] -= delta;
+            else
+                boundaries[j] += delta;
+        }
+        boundaryManager.invalidateBoundaries();
+    }
+
+    private void verifySSTableIsAssignedToCorrectStrategy(Integer[] boundaries, CompactionStrategyManager csm, SSTableReader reader)
+    {
+        // Check that sstable is assigned to correct disk
+        int index = getSSTableIndex(boundaries, reader);
+        assertEquals(index, csm.compactionStrategyIndexFor(reader));
+        // Check that compaction strategy actually contains SSTable
+        assertTrue(((SizeTieredCompactionStrategy)csm.compactionStrategyFor(reader)).sstables.contains(reader));
+    }
+
+    /**
+     * Creates disk boundaries such that each disk receives
+     * an equal amount of SSTables
+     */
+    private Integer[] computeBoundaries(int numSSTables, int numDisks)
+    {
+        Integer[] result = new Integer[numDisks];
+        int sstablesPerRange = numSSTables / numDisks;
+        result[0] = sstablesPerRange;
+        for (int i = 1; i < numDisks; i++)
+        {
+            result[i] = result[i - 1] + sstablesPerRange;
+        }
+        result[numDisks - 1] = numSSTables; // make last boundary alwyays be the number of SSTables to prevent rounding errors
+        return result;
+    }
+
+    /**
+     * Since each SSTable contains keys from 0-99, and each sstable
+     * generation is numbered from 1-100, since we are using ByteOrderedPartitioner
+     * we can compute the sstable position in the disk boundaries by finding
+     * the generation position relative to the boundaries
+     */
+    private int getSSTableIndex(Integer[] boundaries, SSTableReader reader)
+    {
+        int index = 0;
+        while (boundaries[index] < reader.descriptor.generation)
+            index++;
+        System.out.println("Index for SSTable " + reader.descriptor.generation + " on boundary " + Arrays.toString(boundaries) + " is " + index);
+        return index;
+    }
+
+
+
+    class MockBoundaryManager
+    {
+        private final ColumnFamilyStore cfs;
+        private Integer[] positions;
+        private DiskBoundaries boundaries;
+
+        public MockBoundaryManager(ColumnFamilyStore cfs, Integer[] positions)
+        {
+            this.cfs = cfs;
+            this.positions = positions;
+            this.boundaries = createDiskBoundaries(cfs, positions);
+        }
+
+        public void invalidateBoundaries()
+        {
+            boundaries.invalidate();
+        }
+
+        public DiskBoundaries getBoundaries()
+        {
+            if (boundaries.isOutOfDate())
+                boundaries = createDiskBoundaries(cfs, positions);
+            return boundaries;
+        }
+
+        private DiskBoundaries createDiskBoundaries(ColumnFamilyStore cfs, Integer[] boundaries)
+        {
+            List<PartitionPosition> positions = Arrays.stream(boundaries).map(b -> Util.token(String.format(String.format("%04d", b))).minKeyBound()).collect(Collectors.toList());
+            return new DiskBoundaries(cfs.getDirectories().getWriteableLocations(), positions, 0, 0);
+        }
+    }
+
+    private static void createSSTableWithKey(String keyspace, String table, int key)
+    {
+        long timestamp = System.currentTimeMillis();
+        DecoratedKey dk = Util.dk(String.format("%04d", key));
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        new RowUpdateBuilder(cfs.metadata, timestamp, dk.getKey())
+        .clustering(Integer.toString(key))
+        .add("val", "val")
+        .build()
+        .applyUnsafe();
+        cfs.forceBlockingFlush();
+    }
+
+    // just to be able to override the data directories
+    private static class MockCFS extends ColumnFamilyStore
+    {
+        MockCFS(ColumnFamilyStore cfs, Directories dirs)
+        {
+            super(cfs.keyspace, cfs.getTableName(), 0, cfs.metadata, dirs, false, false, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25e46f05/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 1335906..7873ac9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -104,6 +104,12 @@ public class CompactionsCQLTest extends CQLTester
         assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
         getCurrentColumnFamilyStore().enableAutoCompaction();
         assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
+        // Alter keyspace replication settings to force compaction strategy reload and check strategy is still enabled
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        getCurrentColumnFamilyStore().getCompactionStrategyManager().maybeReloadDiskBoundaries();
+        assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled());
+
         execute("insert into %s (id) values ('1')");
         flush();
         execute("insert into %s (id) values ('1')");
@@ -161,17 +167,22 @@ public class CompactionsCQLTest extends CQLTester
         localOptions.put("class", "DateTieredCompactionStrategy");
         getCurrentColumnFamilyStore().setCompactionParameters(localOptions);
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+        // Invalidate disk boundaries to ensure that boundary invalidation will not cause the old strategy to be reloaded
+        getCurrentColumnFamilyStore().invalidateDiskBoundaries();
         // altering something non-compaction related
         execute("ALTER TABLE %s WITH gc_grace_seconds = 1000");
         // should keep the local compaction strat
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
+        // Alter keyspace replication settings to force compaction strategy reload
+        execute("alter keyspace "+keyspace()+" with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }");
+        // should keep the local compaction strat
+        assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class));
         // altering a compaction option
         execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}");
         // will use the new option
         assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class));
     }
 
-
     @Test
     public void testSetLocalCompactionStrategyDisable() throws Throwable
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org