You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/01/05 16:55:47 UTC

[2/2] cassandra git commit: Make sure the same token does not exist in several data directories

Make sure the same token does not exist in several data directories

Patch by marcuse; reviewed by Yuki Morishita and Carl Yeksigian for CASSANDRA-6696


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2c63418
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2c63418
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2c63418

Branch: refs/heads/cassandra-3.2
Commit: e2c6341898fa43b0e262ef031f267587050b8d0f
Parents: 1568293
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 3 10:12:47 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jan 5 16:48:50 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   5 +
 conf/cassandra.yaml                             |  13 +-
 .../org/apache/cassandra/config/Config.java     |   2 +-
 .../cassandra/config/DatabaseDescriptor.java    |   3 -
 .../cassandra/db/BlacklistedDirectories.java    |  10 +
 .../db/BlacklistedDirectoriesMBean.java         |   9 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  97 ++++-
 .../org/apache/cassandra/db/Directories.java    |  36 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 193 +++++-----
 .../db/compaction/CompactionManager.java        |  78 +++-
 .../compaction/CompactionStrategyManager.java   | 361 +++++++++++++------
 .../cassandra/db/compaction/CompactionTask.java |   2 +-
 .../DateTieredCompactionStrategy.java           |   3 +-
 .../db/compaction/LeveledManifest.java          |   2 +-
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../cassandra/db/compaction/Scrubber.java       |   7 +-
 .../SizeTieredCompactionStrategy.java           |   9 +-
 .../writers/CompactionAwareWriter.java          |  91 ++++-
 .../writers/DefaultCompactionWriter.java        |  17 +-
 .../writers/MajorLeveledCompactionWriter.java   |  53 +--
 .../writers/MaxSSTableSizeWriter.java           |  22 +-
 .../SplittingSizeTieredCompactionWriter.java    |  17 +-
 .../apache/cassandra/db/lifecycle/Tracker.java  |   4 +-
 .../org/apache/cassandra/db/lifecycle/View.java |   2 +-
 .../org/apache/cassandra/dht/IPartitioner.java  |  17 +
 .../cassandra/dht/Murmur3Partitioner.java       |  23 ++
 .../apache/cassandra/dht/RandomPartitioner.java |  24 ++
 src/java/org/apache/cassandra/dht/Range.java    |  17 +
 src/java/org/apache/cassandra/dht/Splitter.java | 124 +++++++
 .../io/sstable/SimpleSSTableMultiWriter.java    |   7 +-
 .../sstable/format/RangeAwareSSTableWriter.java | 205 +++++++++++
 .../cassandra/io/util/DiskAwareRunnable.java    |  17 +-
 .../cassandra/service/StorageService.java       |  70 ++++
 .../cassandra/service/StorageServiceMBean.java  |   1 +
 .../cassandra/streaming/StreamReader.java       |  11 +-
 .../cassandra/streaming/StreamSession.java      |   4 +-
 .../cassandra/streaming/StreamWriter.java       |   2 +-
 .../compress/CompressedStreamReader.java        |   3 +-
 .../compress/CompressedStreamWriter.java        |   3 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   5 +
 .../org/apache/cassandra/tools/NodeTool.java    |   3 +-
 .../cassandra/tools/SSTableOfflineRelevel.java  |  21 +-
 .../tools/nodetool/RelocateSSTables.java        |  49 +++
 .../LongLeveledCompactionStrategyTest.java      |   4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../db/compaction/CompactionsCQLTest.java       |   5 +-
 .../LeveledCompactionStrategyTest.java          |  36 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |   6 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   3 +-
 .../apache/cassandra/dht/LengthPartitioner.java |   6 +
 .../org/apache/cassandra/dht/SplitterTest.java  | 158 ++++++++
 52 files changed, 1509 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c3a50f..43acd43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
  * Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
  * Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
  * Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1269c98..f5f50c1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,11 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - We now make sure that a token does not exist in several data directories. This
+     means that we run one compaction strategy per data_file_directory and we use
+     one thread per directory to flush. Use nodetool relocatesstables to make sure your
+     tokens are in the correct place, or just wait and compaction will handle it. See
+     CASSANDRA-6696 for more details.
    - bound maximum in-flight commit log replay mutation bytes to 64 megabytes
      tunable via cassandra.commitlog_max_outstanding_replay_bytes
    - Support for type casting has been added to the selection clause.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 74e1d1d..a8a90ba 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -413,14 +413,13 @@ memtable_allocation_type: heap_buffers
 
 # This sets the amount of memtable flush writer threads.  These will
 # be blocked by disk io, and each one will hold a memtable in memory
-# while blocked. 
+# while blocked.
 #
-# memtable_flush_writers defaults to the smaller of (number of disks,
-# number of cores), with a minimum of 2 and a maximum of 8.
-# 
-# If your data directories are backed by SSD, you should increase this
-# to the number of cores.
-#memtable_flush_writers: 8
+# memtable_flush_writers defaults to one per data_file_directory.
+#
+# If your data directories are backed by SSD, you can increase this, but
+# avoid having memtable_flush_writers * data_file_directories > number of cores
+#memtable_flush_writers: 1
 
 # A fixed memory pool size in MB for for SSTable index summaries. If left
 # empty, this will default to 5% of the heap size. If the memory usage of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2bace5e..54cb089 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -98,7 +98,7 @@ public class Config
     @Deprecated
     public Integer concurrent_replicates = null;
 
-    public Integer memtable_flush_writers = null;
+    public Integer memtable_flush_writers = 1;
     public Integer memtable_heap_space_in_mb;
     public Integer memtable_offheap_space_in_mb;
     public Float memtable_cleanup_threshold = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3fc0b31..c82e930 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -582,9 +582,6 @@ public class DatabaseDescriptor
         if (conf.hints_directory.equals(conf.saved_caches_directory))
             throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false);
 
-        if (conf.memtable_flush_writers == null)
-            conf.memtable_flush_writers = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
-
         if (conf.memtable_flush_writers < 1)
             throw new ConfigurationException("memtable_flush_writers must be at least 1, but was " + conf.memtable_flush_writers, false);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f47fd57..3e6332c 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -66,6 +66,16 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         return Collections.unmodifiableSet(unwritableDirectories);
     }
 
+    public void markUnreadable(String path)
+    {
+        maybeMarkUnreadable(new File(path));
+    }
+
+    public void markUnwritable(String path)
+    {
+        maybeMarkUnwritable(new File(path));
+    }
+
     /**
      * Adds parent directory of the file (or the file itself, if it is a directory)
      * to the set of unreadable directories.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
index 3163b9a..3fb9f39 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.util.Set;
 
-public interface BlacklistedDirectoriesMBean {
-
+public interface BlacklistedDirectoriesMBean
+{
     public Set<File> getUnreadableDirectories();
     
     public Set<File> getUnwritableDirectories();
-    
+
+    public void markUnreadable(String path);
+
+    public void markUnwritable(String path);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 738c941..6eefb5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -118,13 +118,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
                                                                                           new NamedThreadFactory("MemtableFlushWriter"),
                                                                                           "internal");
 
+    private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
+    static
+    {
+        for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++)
+        {
+            perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+                                                                        StageManager.KEEPALIVE,
+                                                                        TimeUnit.SECONDS,
+                                                                        new LinkedBlockingQueue<Runnable>(),
+                                                                        new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
+                                                                        "internal");
+        }
+    }
+
     // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
     private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                               StageManager.KEEPALIVE,
@@ -458,7 +472,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Directories getDirectories()
     {
-        return directories;
+        // todo, hack since we need to know the data directories when constructing the compaction strategy
+        if (directories != null)
+            return directories;
+        return new Directories(metadata, initialDirectories);
     }
 
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
@@ -1033,11 +1050,74 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             for (Memtable memtable : memtables)
             {
-                // flush the memtable
-                MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+                List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+                long totalBytesOnDisk = 0;
+                long maxBytesOnDisk = 0;
+                long minBytesOnDisk = Long.MAX_VALUE;
+                List<SSTableReader> sstables = new ArrayList<>();
+                try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+                {
+                    // flush the memtable
+                    List<Memtable.FlushRunnable> flushRunnables = memtable.flushRunnables(txn);
+
+                    for (int i = 0; i < flushRunnables.size(); i++)
+                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
+                    List<SSTableMultiWriter> flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
+
+                    try
+                    {
+                        Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+                        while (writerIterator.hasNext())
+                        {
+                            SSTableMultiWriter writer = writerIterator.next();
+                            if (writer.getFilePointer() > 0)
+                            {
+                                writer.setOpenResult(true).prepareToCommit();
+                            }
+                            else
+                            {
+                                maybeFail(writer.abort(null));
+                                writerIterator.remove();
+                            }
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        for (SSTableMultiWriter writer : flushResults)
+                            t = writer.abort(t);
+                        t = txn.abort(t);
+                        Throwables.propagate(t);
+                    }
+
+                    txn.prepareToCommit();
+
+                    Throwable accumulate = null;
+                    for (SSTableMultiWriter writer : flushResults)
+                        accumulate = writer.commit(accumulate);
+
+                    maybeFail(txn.commit(accumulate));
+
+                    for (SSTableMultiWriter writer : flushResults)
+                    {
+                        Collection<SSTableReader> flushedSSTables = writer.finished();
+                        for (SSTableReader sstable : flushedSSTables)
+                        {
+                            if (sstable != null)
+                            {
+                                sstables.add(sstable);
+                                long size = sstable.bytesOnDisk();
+                                totalBytesOnDisk += size;
+                                maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+                                minBytesOnDisk = Math.min(minBytesOnDisk, size);
+                            }
+                        }
+                    }
+                }
+                memtable.cfs.replaceFlushed(memtable, sstables);
                 reclaim(memtable);
+                logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest {} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk);
             }
-
             // signal the post-flush we've done our work
             postFlush.latch.countDown();
         }
@@ -1366,6 +1446,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
     }
 
+    public CompactionManager.AllSSTableOpStatus relocateSSTables() throws ExecutionException, InterruptedException
+    {
+        return CompactionManager.instance.relocateSSTables(this);
+    }
+
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
     {
         assert !sstables.isEmpty();
@@ -2206,7 +2291,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getUnleveledSSTables()
     {
-        return this.compactionStrategyManager.getUnleveledSSTables();
+        return compactionStrategyManager.getUnleveledSSTables();
     }
 
     public int[] getSSTableCountPerLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8744d43..a572bed 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -289,6 +290,19 @@ public class Directories
         return null;
     }
 
+    public DataDirectory getDataDirectoryForFile(File directory)
+    {
+        if (directory != null)
+        {
+            for (DataDirectory dataDirectory : dataDirectories)
+            {
+                if (directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
+                    return dataDirectory;
+            }
+        }
+        return null;
+    }
+
     public Descriptor find(String filename)
     {
         for (File dir : dataPaths)
@@ -403,7 +417,7 @@ public class Directories
         for (DataDirectory dataDir : paths)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
-                  continue;
+                continue;
             DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
             // exclude directory if its total writeSize does not fit to data directory
             if (candidate.availableSpace < writeSize)
@@ -413,6 +427,26 @@ public class Directories
         return totalAvailable > expectedTotalWriteSize;
     }
 
+    public DataDirectory[] getWriteableLocations()
+    {
+        List<DataDirectory> nonBlacklistedDirs = new ArrayList<>();
+        for (DataDirectory dir : dataDirectories)
+        {
+            if (!BlacklistedDirectories.isUnwritable(dir.location))
+                nonBlacklistedDirs.add(dir);
+        }
+
+        Collections.sort(nonBlacklistedDirs, new Comparator<DataDirectory>()
+        {
+            @Override
+            public int compare(DataDirectory o1, DataDirectory o2)
+            {
+                return o1.location.compareTo(o2.location);
+            }
+        });
+        return nonBlacklistedDirs.toArray(new DataDirectory[nonBlacklistedDirs.size()]);
+    }
+
     public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
     {
         return getSnapshotDirectory(desc.directory, snapshotName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 96b1775..8e7a43c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.File;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,14 +44,14 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.MemtablePool;
@@ -250,9 +249,32 @@ public class Memtable implements Comparable<Memtable>
         return partitions.size();
     }
 
-    public FlushRunnable flushRunnable()
+    public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
     {
-        return new FlushRunnable(lastReplayPosition.get());
+        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
+
+        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
+            return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn));
+
+        return createFlushRunnables(localRanges, txn);
+    }
+
+    private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+    {
+        assert cfs.getPartitioner().splitter().isPresent();
+
+        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+        List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
+        PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
+        ReplayPosition context = lastReplayPosition.get();
+        for (int i = 0; i < boundaries.size(); i++)
+        {
+            PartitionPosition t = boundaries.get(i);
+            runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
+            rangeStart = t;
+        }
+        return runnables;
     }
 
     public String toString()
@@ -312,23 +334,41 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
-    class FlushRunnable extends DiskAwareRunnable
+    class FlushRunnable implements Callable<SSTableMultiWriter>
     {
-        private final ReplayPosition context;
+        public final ReplayPosition context;
         private final long estimatedSize;
+        private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
 
         private final boolean isBatchLogTable;
+        private final SSTableMultiWriter writer;
+
+        // keeping these to be able to log what we are actually flushing
+        private final PartitionPosition from;
+        private final PartitionPosition to;
 
-        FlushRunnable(ReplayPosition context)
+        FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
         {
-            this.context = context;
+            this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
+        }
+
+        FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
+        {
+            this(context, partitions, null, null, null, txn);
+        }
 
+        FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+        {
+            this.context = context;
+            this.toFlush = toFlush;
+            this.from = from;
+            this.to = to;
             long keySize = 0;
-            for (PartitionPosition key : partitions.keySet())
+            for (PartitionPosition key : toFlush.keySet())
             {
                 //  make sure we don't write non-sensical keys
                 assert key instanceof DecoratedKey;
-                keySize += ((DecoratedKey)key).getKey().remaining();
+                keySize += ((DecoratedKey) key).getKey().remaining();
             }
             estimatedSize = (long) ((keySize // index entries
                                     + keySize // keys in data file
@@ -336,21 +376,12 @@ public class Memtable implements Comparable<Memtable>
                                     * 1.2); // bloom filter and row index overhead
 
             this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
-        }
 
-        public long getExpectedWriteSize()
-        {
-            return estimatedSize;
-        }
+            if (flushLocation == null)
+                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(getDirectories().getWriteableLocation(estimatedSize))), columnsCollector.get(), statsCollector.get());
+            else
+                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
 
-        protected void runMayThrow() throws Exception
-        {
-            long writeSize = getExpectedWriteSize();
-            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
-            File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
-            assert sstableDirectory != null : "Flush task is not bound to any disk";
-            Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstables);
         }
 
         protected Directories getDirectories()
@@ -358,90 +389,64 @@ public class Memtable implements Comparable<Memtable>
             return cfs.getDirectories();
         }
 
-        private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
+        private void writeSortedContents(ReplayPosition context)
         {
-            logger.debug("Writing {}", Memtable.this.toString());
+            logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
 
-            Collection<SSTableReader> ssTables;
-            try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (AtomicBTreePartition partition : toFlush.values())
             {
-                boolean trackContention = logger.isTraceEnabled();
-                int heavilyContendedRowCount = 0;
-                // (we can't clear out the map as-we-go to free up memory,
-                //  since the memtable is being used for queries in the "pending flush" category)
-                for (AtomicBTreePartition partition : partitions.values())
+                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+                // we don't need to preserve tombstones for repair. So if both operation are in this
+                // memtable (which will almost always be the case if there is no ongoing failure), we can
+                // just skip the entry (CASSANDRA-4667).
+                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                    continue;
+
+                if (trackContention && partition.usePessimisticLocking())
+                    heavilyContendedRowCount++;
+
+                if (!partition.isEmpty())
                 {
-                    // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                    // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                    // we don't need to preserve tombstones for repair. So if both operation are in this
-                    // memtable (which will almost always be the case if there is no ongoing failure), we can
-                    // just skip the entry (CASSANDRA-4667).
-                    if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                        continue;
-
-                    if (trackContention && partition.usePessimisticLocking())
-                        heavilyContendedRowCount++;
-
-                    if (!partition.isEmpty())
+                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                     {
-                        try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                        {
-                            writer.append(iter);
-                        }
+                        writer.append(iter);
                     }
                 }
+            }
 
-                if (writer.getFilePointer() > 0)
-                {
-                    logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                               writer.getFilename(),
-                                               FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                               context));
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                                                              writer.getFilename(),
+                                                                              FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                                                              context));
 
-                    // sstables should contain non-repaired data.
-                    ssTables = writer.finish(true);
-                }
-                else
-                {
-                    logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                                writer.getFilename(), context);
-                    writer.abort();
-                    ssTables = null;
-                }
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
+        }
 
-                if (heavilyContendedRowCount > 0)
-                    logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
+                                                  String filename,
+                                                  PartitionColumns columns,
+                                                  EncodingStats stats)
+        {
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+            return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+                                                (long)toFlush.size(),
+                                                ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                sstableMetadataCollector,
+                                                new SerializationHeader(true, cfs.metadata, columns, stats), txn);
 
-                return ssTables;
-            }
         }
 
-        @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
-        public SSTableTxnWriter createFlushWriter(String filename,
-                                               PartitionColumns columns,
-                                               EncodingStats stats)
+        @Override
+        public SSTableMultiWriter call()
         {
-            // we operate "offline" here, as we expose the resulting reader consciously when done
-            // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
-            LifecycleTransaction txn = null;
-            try
-            {
-                txn = LifecycleTransaction.offline(OperationType.FLUSH);
-                MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-                return new SSTableTxnWriter(txn,
-                                            cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
-                                                                         (long) partitions.size(),
-                                                                         ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                         sstableMetadataCollector,
-                                                                         new SerializationHeader(true, cfs.metadata, columns, stats),
-                                                                         txn));
-            }
-            catch (Throwable t)
-            {
-                if (txn != null)
-                    txn.close();
-                throw t;
-            }
+            writeSortedContents(context);
+            return writer;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 571088b..1f39767 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
@@ -270,7 +271,7 @@ public class CompactionManager implements CompactionManagerMBean
             Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
             if (Iterables.isEmpty(sstables))
             {
-                logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+                logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
                 return AllSSTableOpStatus.SUCCESSFUL;
             }
 
@@ -432,6 +433,77 @@ public class CompactionManager implements CompactionManagerMBean
         }, OperationType.CLEANUP);
     }
 
+    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs) throws ExecutionException, InterruptedException
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+        {
+            logger.info("Partitioner does not support splitting");
+            return AllSSTableOpStatus.ABORTED;
+        }
+        final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
+
+        if (r.isEmpty())
+        {
+            logger.info("Relocate cannot run before a node has joined the ring");
+            return AllSSTableOpStatus.ABORTED;
+        }
+
+        final List<Range<Token>> localRanges = Range.sort(r);
+        final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+        final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+
+        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+        {
+            @Override
+            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
+            {
+                Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
+                Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
+                transaction.cancel(Sets.difference(originals, needsRelocation));
+
+                Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
+                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+
+                int maxSize = 0;
+                for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+                    maxSize = Math.max(maxSize, diskSSTables.size());
+
+                List<SSTableReader> mixedSSTables = new ArrayList<>();
+
+                for (int i = 0; i < maxSize; i++)
+                    for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+                        if (i < diskSSTables.size())
+                            mixedSSTables.add(diskSSTables.get(i));
+
+                return mixedSSTables;
+            }
+
+            private boolean inCorrectLocation(SSTableReader sstable)
+            {
+                if (!cfs.getPartitioner().splitter().isPresent())
+                    return true;
+                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+                Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+
+                Directories.DataDirectory location = locations[directoryIndex];
+                PartitionPosition diskLast = diskBoundaries.get(directoryIndex);
+                // the location we get from directoryIndex is based on the first key in the sstable
+                // now we need to make sure the last key is less than the boundary as well:
+                return sstable.descriptor.directory.getAbsolutePath().startsWith(location.location.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
+            }
+
+            @Override
+            public void execute(LifecycleTransaction txn) throws IOException
+            {
+                logger.debug("Relocating {}", txn.originals());
+                AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
+                task.setUserDefined(true);
+                task.setCompactionType(OperationType.RELOCATE);
+                task.execute(metrics);
+            }
+        }, OperationType.RELOCATE);
+    }
+
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
@@ -878,9 +950,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         logger.info("Cleaning up {}", sstable);
 
-        File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
-        if (compactionFileLocation == null)
-            throw new IOException("disk full");
+        File compactionFileLocation = sstable.descriptor.directory;
 
         List<SSTableReader> finished;
         int nowInSec = FBUtilities.nowInSeconds();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 7c7e86a..067a0c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.index.Index;
+import com.google.common.primitives.Ints;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
@@ -43,19 +47,21 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
 
 /**
  * Manages the compaction strategies.
  *
- * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
  * unrepaired data. This is done to be able to totally separate the different sets of sstables.
  */
+
 public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
     private final ColumnFamilyStore cfs;
-    private volatile AbstractCompactionStrategy repaired;
-    private volatile AbstractCompactionStrategy unrepaired;
+    private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>();
+    private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
     private volatile boolean enabled = true;
     public boolean isActive = true;
     private volatile CompactionParams params;
@@ -67,6 +73,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         we will use the new compaction parameters.
      */
     private CompactionParams schemaCompactionParams;
+    private Directories.DataDirectory[] locations;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
@@ -75,6 +82,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         this.cfs = cfs;
         reload(cfs.metadata);
         params = cfs.metadata.params.compaction;
+        locations = getDirectories().getWriteableLocations();
         enabled = params.isEnabled();
     }
 
@@ -91,20 +99,17 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         maybeReload(cfs.metadata);
 
-        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
-        {
-            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
-            if (repairedTask != null)
-                return repairedTask;
-            return unrepaired.getNextBackgroundTask(gcBefore);
-        }
-        else
+        List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size());
+        strategies.addAll(repaired);
+        strategies.addAll(unrepaired);
+        Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+        for (AbstractCompactionStrategy strategy : strategies)
         {
-            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
-            if (unrepairedTask != null)
-                return unrepairedTask;
-            return repaired.getNextBackgroundTask(gcBefore);
+            AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
+            if (task != null)
+                return task;
         }
+        return null;
     }
 
     public boolean isEnabled()
@@ -135,36 +140,78 @@ public class CompactionStrategyManager implements INotificationConsumer
             if (sstable.openReason != SSTableReader.OpenReason.EARLY)
                 getCompactionStrategyFor(sstable).addSSTable(sstable);
         }
-        repaired.startup();
-        unrepaired.startup();
+        repaired.forEach(AbstractCompactionStrategy::startup);
+        unrepaired.forEach(AbstractCompactionStrategy::startup);
     }
 
     /**
      * return the compaction strategy for the given sstable
      *
-     * returns differently based on the repaired status
+     * returns differently based on the repaired status and which vnode the compaction strategy belongs to
      * @param sstable
      * @return
      */
     private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
+        int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
         if (sstable.isRepaired())
-            return repaired;
+            return repaired.get(index);
         else
-            return unrepaired;
+            return unrepaired.get(index);
+    }
+
+    /**
+     * Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we
+     * will add it to that compaction strategy.
+     *
+     * In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk
+     * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
+     * sstables in the correct locations and give them to the correct compaction strategy instance.
+     *
+     * @param cfs
+     * @param locations
+     * @param sstable
+     * @return
+     */
+    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return 0;
+
+        Directories.DataDirectory[] directories = locations.getWriteableLocations();
+
+        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, locations.getWriteableLocations());
+        if (boundaries == null)
+        {
+            // try to figure out location based on sstable directory:
+            for (int i = 0; i < directories.length; i++)
+            {
+                Directories.DataDirectory directory = directories[i];
+                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                    return i;
+            }
+            return 0;
+        }
+
+        int pos = Collections.binarySearch(boundaries, sstable.first);
+        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+        return -pos - 1;
+
+
     }
 
     public void shutdown()
     {
         isActive = false;
-        repaired.shutdown();
-        unrepaired.shutdown();
+        repaired.forEach(AbstractCompactionStrategy::shutdown);
+        unrepaired.forEach(AbstractCompactionStrategy::shutdown);
     }
 
     public synchronized void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
-        if (metadata.params.compaction.equals(schemaCompactionParams))
+        if (metadata.params.compaction.equals(schemaCompactionParams) &&
+            Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
             return;
         reload(metadata);
     }
@@ -178,6 +225,11 @@ public class CompactionStrategyManager implements INotificationConsumer
     public synchronized void reload(CFMetaData metadata)
     {
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
+        if (!metadata.params.compaction.equals(schemaCompactionParams))
+            logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+        else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
+            logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+
         setStrategy(metadata.params.compaction);
         schemaCompactionParams = metadata.params.compaction;
 
@@ -197,11 +249,13 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int getUnleveledSSTables()
     {
-        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+        if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
         {
             int count = 0;
-            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
-            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+            for (AbstractCompactionStrategy strategy : repaired)
+                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
+            for (AbstractCompactionStrategy strategy : unrepaired)
+                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
             return count;
         }
         return 0;
@@ -209,13 +263,19 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public synchronized int[] getSSTableCountPerLevel()
     {
-        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+        if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
         {
             int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
-            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
-            res = sumArrays(res, repairedCountPerLevel);
-            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
-            res = sumArrays(res, unrepairedCountPerLevel);
+            for (AbstractCompactionStrategy strategy : repaired)
+            {
+                int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                res = sumArrays(res, repairedCountPerLevel);
+            }
+            for (AbstractCompactionStrategy strategy : unrepaired)
+            {
+                int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                res = sumArrays(res, unrepairedCountPerLevel);
+            }
             return res;
         }
         return null;
@@ -238,103 +298,112 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public boolean shouldDefragment()
     {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.shouldDefragment();
+        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+        return repaired.get(0).shouldDefragment();
     }
 
     public Directories getDirectories()
     {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.getDirectories();
+        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+        return repaired.get(0).getDirectories();
     }
 
     public synchronized void handleNotification(INotification notification, Object sender)
     {
+        maybeReload(cfs.metadata);
         if (notification instanceof SSTableAddedNotification)
         {
             SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
             for (SSTableReader sstable : flushedNotification.added)
-            {
-                if (sstable.isRepaired())
-                    repaired.addSSTable(sstable);
-                else
-                    unrepaired.addSSTable(sstable);
-            }
+                getCompactionStrategyFor(sstable).addSSTable(sstable);
         }
         else if (notification instanceof SSTableListChangedNotification)
         {
+            // a bit of gymnastics to be able to replace sstables in compaction strategies
+            // we use this to know that a compaction finished and where to start the next compaction in LCS
             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
-            Set<SSTableReader> repairedRemoved = new HashSet<>();
-            Set<SSTableReader> repairedAdded = new HashSet<>();
-            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
-            Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+            Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
+            int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+
+            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+            for (int i = 0; i < locationSize; i++)
+            {
+                repairedRemoved.add(new HashSet<>());
+                repairedAdded.add(new HashSet<>());
+                unrepairedRemoved.add(new HashSet<>());
+                unrepairedAdded.add(new HashSet<>());
+            }
 
             for (SSTableReader sstable : listChangedNotification.removed)
             {
+                int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
-                    repairedRemoved.add(sstable);
+                    repairedRemoved.get(i).add(sstable);
                 else
-                    unrepairedRemoved.add(sstable);
+                    unrepairedRemoved.get(i).add(sstable);
             }
             for (SSTableReader sstable : listChangedNotification.added)
             {
+                int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
-                    repairedAdded.add(sstable);
+                    repairedAdded.get(i).add(sstable);
                 else
-                    unrepairedAdded.add(sstable);
-            }
-            if (!repairedRemoved.isEmpty())
-            {
-                repaired.replaceSSTables(repairedRemoved, repairedAdded);
-            }
-            else
-            {
-                for (SSTableReader sstable : repairedAdded)
-                    repaired.addSSTable(sstable);
+                    unrepairedAdded.get(i).add(sstable);
             }
 
-            if (!unrepairedRemoved.isEmpty())
+            for (int i = 0; i < locationSize; i++)
             {
-                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
-            }
-            else
-            {
-                for (SSTableReader sstable : unrepairedAdded)
-                    unrepaired.addSSTable(sstable);
+                if (!repairedRemoved.get(i).isEmpty())
+                    repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
+                else
+                {
+                    for (SSTableReader sstable : repairedAdded.get(i))
+                        repaired.get(i).addSSTable(sstable);
+                }
+                if (!unrepairedRemoved.get(i).isEmpty())
+                    unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
+                else
+                {
+                    for (SSTableReader sstable : unrepairedAdded.get(i))
+                        unrepaired.get(i).addSSTable(sstable);
+                }
             }
         }
         else if (notification instanceof SSTableRepairStatusChanged)
         {
             for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
             {
+                int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
                 {
-                    unrepaired.removeSSTable(sstable);
-                    repaired.addSSTable(sstable);
+                    unrepaired.get(index).removeSSTable(sstable);
+                    repaired.get(index).addSSTable(sstable);
                 }
                 else
                 {
-                    repaired.removeSSTable(sstable);
-                    unrepaired.addSSTable(sstable);
+                    repaired.get(index).removeSSTable(sstable);
+                    unrepaired.get(index).addSSTable(sstable);
                 }
             }
         }
         else if (notification instanceof SSTableDeletingNotification)
         {
-            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
-            if (sstable.isRepaired())
-                repaired.removeSSTable(sstable);
-            else
-                unrepaired.removeSSTable(sstable);
+            SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting;
+            getCompactionStrategyFor(sstable).removeSSTable(sstable);
         }
     }
 
     public void enable()
     {
         if (repaired != null)
-            repaired.enable();
+            repaired.forEach(AbstractCompactionStrategy::enable);
         if (unrepaired != null)
-            unrepaired.enable();
+            unrepaired.forEach(AbstractCompactionStrategy::enable);
         // enable this last to make sure the strategies are ready to get calls.
         enabled = true;
     }
@@ -344,47 +413,64 @@ public class CompactionStrategyManager implements INotificationConsumer
         // disable this first avoid asking disabled strategies for compaction tasks
         enabled = false;
         if (repaired != null)
-            repaired.disable();
+            repaired.forEach(AbstractCompactionStrategy::disable);
         if (unrepaired != null)
-            unrepaired.disable();
+            unrepaired.forEach(AbstractCompactionStrategy::disable);
     }
 
     /**
-     * Create ISSTableScanner from the given sstables
+     * Create ISSTableScanners from the given sstables
      *
      * Delegates the call to the compaction strategies to allow LCS to create a scanner
      * @param sstables
-     * @param range
+     * @param ranges
      * @return
      */
     @SuppressWarnings("resource")
     public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
-        List<SSTableReader> repairedSSTables = new ArrayList<>();
-        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+        assert repaired.size() == unrepaired.size();
+        List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+        List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
+
+        for (int i = 0; i < repaired.size(); i++)
+        {
+            repairedSSTables.add(new HashSet<>());
+            unrepairedSSTables.add(new HashSet<>());
+        }
+
         for (SSTableReader sstable : sstables)
         {
             if (sstable.isRepaired())
-                repairedSSTables.add(sstable);
+                repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
             else
-                unrepairedSSTables.add(sstable);
+                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
         }
 
-        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
+        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 
         for (Range<Token> range : ranges)
         {
-            AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
-            AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+            List<ISSTableScanner> repairedScanners = new ArrayList<>();
+            List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
 
-            for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+            for (int i = 0; i < repairedSSTables.size(); i++)
+            {
+                if (!repairedSSTables.get(i).isEmpty())
+                    repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
+            }
+            for (int i = 0; i < unrepairedSSTables.size(); i++)
+            {
+                if (!unrepairedSSTables.get(i).isEmpty())
+                    scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
+            }
+            for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
             {
                 if (!scanners.add(scanner))
                     scanner.close();
             }
         }
-
-        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
+        return new AbstractCompactionStrategy.ScannerList(scanners);
     }
 
     public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
@@ -394,21 +480,44 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
-        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+        Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+        Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
+
+        for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
+            anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+        return anticompactionGroups;
     }
 
     public long getMaxSSTableBytes()
     {
-        return unrepaired.getMaxSSTableBytes();
+        return unrepaired.get(0).getMaxSSTableBytes();
     }
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
+        maybeReload(cfs.metadata);
+        validateForCompaction(txn.originals());
         return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
     }
 
+    private void validateForCompaction(Iterable<SSTableReader> input)
+    {
+        SSTableReader firstSSTable = Iterables.getFirst(input, null);
+        assert firstSSTable != null;
+        boolean repaired = firstSSTable.isRepaired();
+        int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable);
+        for (SSTableReader sstable : input)
+        {
+            if (sstable.isRepaired() != repaired)
+                throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+            if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable))
+                throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+        }
+    }
+
     public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
     {
+        maybeReload(cfs.metadata);
         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
         // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
         // sstables are marked the compactions are re-enabled
@@ -419,20 +528,21 @@ public class CompactionStrategyManager implements INotificationConsumer
             {
                 synchronized (CompactionStrategyManager.this)
                 {
-                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
-                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
-
-                    if (repairedTasks == null && unrepairedTasks == null)
-                        return null;
-
-                    if (repairedTasks == null)
-                        return unrepairedTasks;
-                    if (unrepairedTasks == null)
-                        return repairedTasks;
-
                     List<AbstractCompactionTask> tasks = new ArrayList<>();
-                    tasks.addAll(repairedTasks);
-                    tasks.addAll(unrepairedTasks);
+                    for (AbstractCompactionStrategy strategy : repaired)
+                    {
+                        Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+                        if (task != null)
+                            tasks.addAll(task);
+                    }
+                    for (AbstractCompactionStrategy strategy : unrepaired)
+                    {
+                        Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+                        if (task != null)
+                            tasks.addAll(task);
+                    }
+                    if (tasks.isEmpty())
+                        return null;
                     return tasks;
                 }
             }
@@ -441,14 +551,18 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
     {
+        maybeReload(cfs.metadata);
+        validateForCompaction(sstables);
         return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
     }
 
     public int getEstimatedRemainingTasks()
     {
         int tasks = 0;
-        tasks += repaired.getEstimatedRemainingTasks();
-        tasks += unrepaired.getEstimatedRemainingTasks();
+        for (AbstractCompactionStrategy strategy : repaired)
+            tasks += strategy.getEstimatedRemainingTasks();
+        for (AbstractCompactionStrategy strategy : unrepaired)
+            tasks += strategy.getEstimatedRemainingTasks();
 
         return tasks;
     }
@@ -460,10 +574,10 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public String getName()
     {
-        return unrepaired.getName();
+        return unrepaired.get(0).getName();
     }
 
-    public List<AbstractCompactionStrategy> getStrategies()
+    public List<List<AbstractCompactionStrategy>> getStrategies()
     {
         return Arrays.asList(repaired, unrepaired);
     }
@@ -481,12 +595,25 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void setStrategy(CompactionParams params)
     {
-        if (repaired != null)
-            repaired.shutdown();
-        if (unrepaired != null)
-            unrepaired.shutdown();
-        repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
-        unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
+        repaired.forEach(AbstractCompactionStrategy::shutdown);
+        unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        repaired.clear();
+        unrepaired.clear();
+
+        if (cfs.getPartitioner().splitter().isPresent())
+        {
+            locations = cfs.getDirectories().getWriteableLocations();
+            for (int i = 0; i < locations.length; i++)
+            {
+                repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+                unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+            }
+        }
+        else
+        {
+            repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+            unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+        }
         this.params = params;
     }
 
@@ -510,11 +637,11 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
         {
-            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
         }
         else
         {
-            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index be81c80..6b9fe21 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask
                                                           LifecycleTransaction transaction,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
+        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals, getLevel());
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 50f9b71..2dc6ee8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -85,7 +85,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      */
     private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
-        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+        if (sstables.isEmpty())
             return Collections.emptyList();
 
         Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
@@ -212,6 +212,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     {
         sstables.remove(sstable);
     }
+
     /**
      * A target time span used for bucketing SSTables based on timestamps.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index b7bf83f..c54b751 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -355,7 +355,7 @@ public class LeveledManifest
         Collection<SSTableReader> candidates = getCandidatesFor(0);
         if (candidates.isEmpty())
             return null;
-        return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes());
+        return new CompactionCandidate(candidates, getNextLevel(candidates), maxSSTableSizeInBytes);
     }
 
     private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 20e6df2..84a34c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -37,7 +37,8 @@ public enum OperationType
     STREAM("Stream"),
     WRITE("Write"),
     VIEW_BUILD("View build"),
-    INDEX_SUMMARY("Index summary redistribution");
+    INDEX_SUMMARY("Index summary redistribution"),
+    RELOCATE("Relocate sstables to correct disk");
 
     public final String type;
     public final String fileName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 272c2f8..838b0a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -104,11 +104,8 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        // Calculate the expected compacted filesize
-        this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
-        if (destination == null)
-            throw new IOException("disk full");
-
+        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+        this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 
         boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index f8a8240..e36adf2 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
         logger.trace("Compaction buckets are {}", buckets);
-        updateEstimatedCompactionsByTasks(buckets);
+        estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
         List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
         if (!mostInteresting.isEmpty())
             return mostInteresting;
@@ -282,15 +283,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return new ArrayList<List<T>>(buckets.values());
     }
 
-    private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> tasks)
+    public static int getEstimatedCompactionsByTasks(ColumnFamilyStore cfs, List<List<SSTableReader>> tasks)
     {
         int n = 0;
-        for (List<SSTableReader> bucket: tasks)
+        for (List<SSTableReader> bucket : tasks)
         {
             if (bucket.size() >= cfs.getMinimumCompactionThreshold())
                 n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
         }
-        estimatedRemainingTasks = n;
+        return n;
     }
 
     public long getMaxSSTableBytes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 0b3b7d0..46023ce 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,18 +18,26 @@
 
 package org.apache.cassandra.db.compaction.writers;
 
+import java.io.File;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.concurrent.Transactional;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -38,6 +46,8 @@ import org.apache.cassandra.utils.concurrent.Transactional;
  */
 public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
 {
+    protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+
     protected final ColumnFamilyStore cfs;
     protected final Directories directories;
     protected final Set<SSTableReader> nonExpiredSSTables;
@@ -45,9 +55,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected final long maxAge;
     protected final long minRepairedAt;
 
-    protected final LifecycleTransaction txn;
     protected final SSTableRewriter sstableWriter;
-    private boolean isInitialized = false;
+    protected final LifecycleTransaction txn;
+    private final Directories.DataDirectory[] locations;
+    private final List<PartitionPosition> diskBoundaries;
+    private int locationIndex;
 
     public CompactionAwareWriter(ColumnFamilyStore cfs,
                                  Directories directories,
@@ -59,12 +71,15 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         this.cfs = cfs;
         this.directories = directories;
         this.nonExpiredSSTables = nonExpiredSSTables;
-        this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
-        this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
-        this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
         this.txn = txn;
-        this.sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
 
+        estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
+        maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
+        sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+        minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+        locations = cfs.getDirectories().getWriteableLocations();
+        diskBoundaries = StorageService.getDiskBoundaries(cfs);
+        locationIndex = -1;
     }
 
     @Override
@@ -96,6 +111,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         return sstableWriter.finished();
     }
 
+    public abstract List<SSTableReader> finish(long repairedAt);
+
     /**
      * estimated number of keys we should write
      */
@@ -104,6 +121,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         return estimatedTotalKeys;
     }
 
+    /**
+     * Writes a partition in an implementation specific way
+     * @param partition the partition to append
+     * @return true if the partition was written, false otherwise
+     */
     public final boolean append(UnfilteredRowIterator partition)
     {
         maybeSwitchWriter(partition.partitionKey());
@@ -125,9 +147,26 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
      */
     protected void maybeSwitchWriter(DecoratedKey key)
     {
-        if (!isInitialized)
-            switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
-        isInitialized = true;
+        if (diskBoundaries == null)
+        {
+            if (locationIndex < 0)
+            {
+                Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN));
+                switchCompactionLocation(defaultLocation);
+                locationIndex = 0;
+            }
+            return;
+        }
+
+        if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0)
+            return;
+
+        int prevIdx = locationIndex;
+        while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
+            locationIndex++;
+        if (prevIdx >= 0)
+            logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
+        switchCompactionLocation(locations[locationIndex]);
     }
 
     /**
@@ -148,13 +187,37 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     /**
      * Return a directory where we can expect expectedWriteSize to fit.
+     *
+     * @param sstables the sstables to compact
+     * @return
      */
-    public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
+    public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
     {
-        Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize);
-        if (directory == null)
-            throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes");
+        File directory = null;
+        for (SSTableReader sstable : sstables)
+        {
+            if (directory == null)
+                directory = sstable.descriptor.directory;
+            if (!directory.equals(sstable.descriptor.directory))
+                logger.trace("All sstables not from the same disk - putting results in {}", directory);
+        }
+        Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(directory);
+        if (d != null)
+        {
+            if (d.getAvailableSpace() < estimatedWriteSize)
+                throw new RuntimeException(String.format("Not enough space to write %d bytes to %s (%d bytes available)", estimatedWriteSize, d.location, d.getAvailableSpace()));
+            logger.trace("putting compaction results in {}", directory);
+            return d;
+        }
+        d = getDirectories().getWriteableLocation(estimatedWriteSize);
+        if (d == null)
+            throw new RuntimeException("Not enough disk space to store "+estimatedWriteSize+" bytes");
+        return d;
+    }
 
-        return directory;
+    public CompactionAwareWriter setRepairedAt(long repairedAt)
+    {
+        this.sstableWriter.setRepairedAt(repairedAt);
+        return this;
     }
 }