You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/01/05 16:55:46 UTC

[1/2] cassandra git commit: Make sure the same token does not exist in several data directories

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.2 156829340 -> e2c634189


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8b4351f..5e78834 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction.writers;
 
 
+import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -39,16 +40,18 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class DefaultCompactionWriter extends CompactionAwareWriter
 {
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+    private final int sstableLevel;
 
     public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, directories, txn, nonExpiredSSTables, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, false, false, 0);
     }
 
     @SuppressWarnings("resource")
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals, int sstableLevel)
     {
         super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+        this.sstableLevel = sstableLevel;
     }
 
     @Override
@@ -58,14 +61,14 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
-    protected void switchCompactionLocation(Directories.DataDirectory directory)
+    public void switchCompactionLocation(Directories.DataDirectory directory)
     {
         @SuppressWarnings("resource")
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     cfs.metadata,
-                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
+                                                    new MetadataCollector(txn.originals(), cfs.metadata.comparator, sstableLevel),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
                                                     cfs.indexManager.listIndexes(),
                                                     txn);
@@ -73,6 +76,12 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
+    public List<SSTableReader> finish(long repairedAt)
+    {
+        return sstableWriter.setRepairedAt(repairedAt).finish();
+    }
+
+    @Override
     public long estimatedKeys()
     {
         return estimatedTotalKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index b0c4562..0c88ac6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -17,12 +17,9 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
-import java.io.File;
+import java.util.List;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
@@ -37,15 +34,14 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 
 public class MajorLeveledCompactionWriter extends CompactionAwareWriter
 {
-    private static final Logger logger = LoggerFactory.getLogger(MajorLeveledCompactionWriter.class);
     private final long maxSSTableSize;
-    private final long expectedWriteSize;
-    private final Set<SSTableReader> allSSTables;
     private int currentLevel = 1;
     private long averageEstimatedKeysPerSSTable;
     private long partitionsWritten = 0;
     private long totalWrittenInLevel = 0;
     private int sstablesWritten = 0;
+    private final long keysPerSSTable;
+    private Directories.DataDirectory sstableDirectory;
 
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
                                         Directories directories,
@@ -67,8 +63,8 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     {
         super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
         this.maxSSTableSize = maxSSTableSize;
-        this.allSSTables = txn.originals();
-        expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
+        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
+        keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
     }
 
     @Override
@@ -86,28 +82,33 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                 totalWrittenInLevel = 0;
                 currentLevel++;
             }
-
-            averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
-            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
-            partitionsWritten = 0;
-            sstablesWritten++;
+            switchCompactionLocation(sstableDirectory);
         }
         return rie != null;
 
     }
 
-    public void switchCompactionLocation(Directories.DataDirectory directory)
+    @Override
+    public void switchCompactionLocation(Directories.DataDirectory location)
+    {
+        this.sstableDirectory = location;
+        averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
+        sstableWriter.switchWriter(SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
+                keysPerSSTable,
+                minRepairedAt,
+                cfs.metadata,
+                new MetadataCollector(txn.originals(), cfs.metadata.comparator, currentLevel),
+                SerializationHeader.make(cfs.metadata, txn.originals()),
+                cfs.indexManager.listIndexes(),
+                txn));
+        partitionsWritten = 0;
+        sstablesWritten = 0;
+
+    }
+
+    @Override
+    public List<SSTableReader> finish(long repairedAt)
     {
-        File sstableDirectory = getDirectories().getLocationForDisk(directory);
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                    averageEstimatedKeysPerSSTable,
-                                                    minRepairedAt,
-                                                    cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                    cfs.indexManager.listIndexes(),
-                                                    txn);
-        sstableWriter.switchWriter(writer);
+        return sstableWriter.setRepairedAt(repairedAt).finish();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 1dc72e7..ac83cc6 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
+import java.io.File;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -33,11 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class MaxSSTableSizeWriter extends CompactionAwareWriter
 {
     private final long estimatedTotalKeys;
-    private final long expectedWriteSize;
     private final long maxSSTableSize;
     private final int level;
     private final long estimatedSSTables;
     private final Set<SSTableReader> allSSTables;
+    private Directories.DataDirectory sstableDirectory;
 
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
                                 Directories directories,
@@ -63,25 +65,25 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
-        long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
-        expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
     }
 
-    @Override
-    public boolean realAppend(UnfilteredRowIterator partition)
+    protected boolean realAppend(UnfilteredRowIterator partition)
     {
         RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
-            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+        {
+            switchCompactionLocation(sstableDirectory);
+        }
         return rie != null;
     }
 
+    @Override
     public void switchCompactionLocation(Directories.DataDirectory location)
     {
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+        sstableDirectory = location;
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(sstableDirectory))),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
                                                     cfs.metadata,
@@ -91,7 +93,11 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                                     txn);
 
         sstableWriter.switchWriter(writer);
+    }
 
+    public List<SSTableReader> finish(long repairedAt)
+    {
+        return sstableWriter.setRepairedAt(repairedAt).finish();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 3a7f526..46183dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -17,8 +17,8 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
-import java.io.File;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -51,6 +51,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
     private final Set<SSTableReader> allSSTables;
     private long currentBytesToWrite;
     private int currentRatioIndex = 0;
+    private Directories.DataDirectory location;
 
     public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
@@ -82,10 +83,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
             }
         }
         ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
-        long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
         currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-        switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
-        logger.trace("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
     }
 
     @Override
@@ -96,15 +94,17 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         {
             currentRatioIndex++;
             currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-            switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
+            switchCompactionLocation(location);
+            logger.debug("Switching writer, currentBytesToWrite = {}", currentBytesToWrite);
         }
         return rie != null;
     }
 
+    @Override
     public void switchCompactionLocation(Directories.DataDirectory location)
     {
+        this.location = location;
         long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
-        @SuppressWarnings("resource")
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
                                                     currentPartitionsToWrite,
                                                     minRepairedAt,
@@ -115,6 +115,11 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
                                                     txn);
         logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
         sstableWriter.switchWriter(writer);
+    }
 
+    @Override
+    public List<SSTableReader> finish(long repairedAt)
+    {
+        return sstableWriter.setRepairedAt(repairedAt).finish();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index c09d49c..4c73472 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -327,10 +327,10 @@ public class Tracker
         apply(View.markFlushing(memtable));
     }
 
-    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
+    public void replaceFlushed(Memtable memtable, Iterable<SSTableReader> sstables)
     {
         assert !isDummy();
-        if (sstables == null || sstables.isEmpty())
+        if (sstables == null || Iterables.isEmpty(sstables))
         {
             // sstable may be null if we flushed batchlog and nothing needed to be retained
             // if it's null, we don't care what state the cfstore is in, we just replace it and continue

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index b62c7e3..63926ed 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
     }
 
     // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
-    static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
+    static Function<View, View> replaceFlushed(final Memtable memtable, final Iterable<SSTableReader> flushed)
     {
         return new Function<View, View>()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/IPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java
index e0a08dc..b559a6f 100644
--- a/src/java/org/apache/cassandra/dht/IPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/IPartitioner.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -49,6 +50,17 @@ public interface IPartitioner
     public Token getMinimumToken();
 
     /**
+     * The biggest token for this partitioner, unlike getMinimumToken, this token is actually used and users wanting to
+     * include all tokens need to do getMaximumToken().maxKeyBound()
+     *
+     * Not implemented for the ordered partitioners
+     */
+    default Token getMaximumToken()
+    {
+        throw new UnsupportedOperationException("If you are using a splitting partitioner, getMaximumToken has to be implemented");
+    }
+
+    /**
      * @return a Token that can be used to route a given key
      * (This is NOT a method to create a Token from its string representation;
      * for that, use TokenFactory.fromString.)
@@ -84,4 +96,9 @@ public interface IPartitioner
      * Used by secondary indices.
      */
     public AbstractType<?> partitionOrdering();
+
+    default Optional<Splitter> splitter()
+    {
+        return Optional.empty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
index d68be3f..f9f6113 100644
--- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -48,6 +48,19 @@ public class Murmur3Partitioner implements IPartitioner
     public static final Murmur3Partitioner instance = new Murmur3Partitioner();
     public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
+    private final Splitter splitter = new Splitter(this)
+    {
+        public Token tokenForValue(BigInteger value)
+        {
+            return new LongToken(value.longValue());
+        }
+
+        public BigInteger valueForToken(Token token)
+        {
+            return BigInteger.valueOf(((LongToken) token).token);
+        }
+    };
+
     public DecoratedKey decorateKey(ByteBuffer key)
     {
         long[] hash = getHash(key);
@@ -291,8 +304,18 @@ public class Murmur3Partitioner implements IPartitioner
         return LongType.instance;
     }
 
+    public Token getMaximumToken()
+    {
+        return new LongToken(Long.MAX_VALUE);
+    }
+
     public AbstractType<?> partitionOrdering()
     {
         return partitionOrdering;
     }
+
+    public Optional<Splitter> splitter()
+    {
+        return Optional.of(splitter);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index b0dea01..96a96ca 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -50,6 +50,19 @@ public class RandomPartitioner implements IPartitioner
     public static final RandomPartitioner instance = new RandomPartitioner();
     public static final AbstractType<?> partitionOrdering = new PartitionerDefinedOrder(instance);
 
+    private final Splitter splitter = new Splitter(this)
+    {
+        public Token tokenForValue(BigInteger value)
+        {
+            return new BigIntegerToken(value);
+        }
+
+        public BigInteger valueForToken(Token token)
+        {
+            return ((BigIntegerToken)token).getTokenValue();
+        }
+    };
+
     public DecoratedKey decorateKey(ByteBuffer key)
     {
         return new CachedHashDecoratedKey(getToken(key), key);
@@ -194,6 +207,11 @@ public class RandomPartitioner implements IPartitioner
         return ownerships;
     }
 
+    public Token getMaximumToken()
+    {
+        return new BigIntegerToken(MAXIMUM);
+    }
+
     public AbstractType<?> getTokenValidator()
     {
         return IntegerType.instance;
@@ -203,4 +221,10 @@ public class RandomPartitioner implements IPartitioner
     {
         return partitionOrdering;
     }
+
+    public Optional<Splitter> splitter()
+    {
+        return Optional.of(splitter);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 1fc6c46..3cc3b23 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -473,6 +473,23 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
         return new Range<T>(left, newRight);
     }
 
+    public static <T extends RingPosition<T>> List<Range<T>> sort(Collection<Range<T>> ranges)
+    {
+        List<Range<T>> output = new ArrayList<>(ranges.size());
+        for (Range<T> r : ranges)
+            output.addAll(r.unwrap());
+        // sort by left
+        Collections.sort(output, new Comparator<Range<T>>()
+        {
+            public int compare(Range<T> b1, Range<T> b2)
+            {
+                return b1.left.compareTo(b2.left);
+            }
+        });
+        return output;
+    }
+
+
     /**
      * Compute a range of keys corresponding to a given range of token.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/dht/Splitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java
new file mode 100644
index 0000000..67b578d
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/Splitter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Partition splitter.
+ */
+public abstract class Splitter
+{
+    private final IPartitioner partitioner;
+
+    protected Splitter(IPartitioner partitioner)
+    {
+        this.partitioner = partitioner;
+    }
+
+    protected abstract Token tokenForValue(BigInteger value);
+
+    protected abstract BigInteger valueForToken(Token token);
+
+    public List<Token> splitOwnedRanges(int parts, List<Range<Token>> localRanges, boolean dontSplitRanges)
+    {
+        if (localRanges.isEmpty() || parts == 1)
+            return Collections.singletonList(partitioner.getMaximumToken());
+
+        BigInteger totalTokens = BigInteger.ZERO;
+        for (Range<Token> r : localRanges)
+        {
+            BigInteger right = valueForToken(token(r.right));
+            totalTokens = totalTokens.add(right.subtract(valueForToken(r.left)));
+        }
+        BigInteger perPart = totalTokens.divide(BigInteger.valueOf(parts));
+
+        if (dontSplitRanges)
+            return splitOwnedRangesNoPartialRanges(localRanges, perPart, parts);
+
+        List<Token> boundaries = new ArrayList<>();
+        BigInteger sum = BigInteger.ZERO;
+        for (Range<Token> r : localRanges)
+        {
+            Token right = token(r.right);
+            BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left)).abs();
+            BigInteger left = valueForToken(r.left);
+            while (sum.add(currentRangeWidth).compareTo(perPart) >= 0)
+            {
+                BigInteger withinRangeBoundary = perPart.subtract(sum);
+                left = left.add(withinRangeBoundary);
+                boundaries.add(tokenForValue(left));
+                currentRangeWidth = currentRangeWidth.subtract(withinRangeBoundary);
+                sum = BigInteger.ZERO;
+            }
+            sum = sum.add(currentRangeWidth);
+        }
+        boundaries.set(boundaries.size() - 1, partitioner.getMaximumToken());
+
+        assert boundaries.size() == parts : boundaries.size() +"!="+parts+" "+boundaries+":"+localRanges;
+        return boundaries;
+    }
+
+    private List<Token> splitOwnedRangesNoPartialRanges(List<Range<Token>> localRanges, BigInteger perPart, int parts)
+    {
+        List<Token> boundaries = new ArrayList<>(parts);
+        BigInteger sum = BigInteger.ZERO;
+        int i = 0;
+        while (boundaries.size() < parts - 1)
+        {
+            Range<Token> r = localRanges.get(i);
+            Range<Token> nextRange = localRanges.get(i + 1);
+            Token right = token(r.right);
+            Token nextRight = token(nextRange.right);
+
+            BigInteger currentRangeWidth = valueForToken(right).subtract(valueForToken(r.left));
+            BigInteger nextRangeWidth = valueForToken(nextRight).subtract(valueForToken(nextRange.left));
+            sum = sum.add(currentRangeWidth);
+            // does this or next range take us beyond the per part limit?
+            if (sum.compareTo(perPart) > 0 || sum.add(nextRangeWidth).compareTo(perPart) > 0)
+            {
+                // Either this or the next range will take us beyond the perPart limit. Will stopping now or
+                // adding the next range create the smallest difference to perPart?
+                BigInteger diffCurrent = sum.subtract(perPart).abs();
+                BigInteger diffNext = sum.add(nextRangeWidth).subtract(perPart).abs();
+                if (diffNext.compareTo(diffCurrent) >= 0)
+                {
+                    sum = BigInteger.ZERO;
+                    boundaries.add(right);
+                }
+            }
+            i++;
+        }
+        boundaries.add(partitioner.getMaximumToken());
+        return boundaries;
+    }
+
+    /**
+     * We avoid calculating for wrap around ranges, instead we use the actual max token, and then, when translating
+     * to PartitionPositions, we include tokens from .minKeyBound to .maxKeyBound to make sure we include all tokens.
+     */
+    private Token token(Token t)
+    {
+        return t.equals(partitioner.getMinimumToken()) ? partitioner.getMaximumToken() : t;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index 68dbd74..2217ae2 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -35,9 +35,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class SimpleSSTableMultiWriter implements SSTableMultiWriter
 {
     private final SSTableWriter writer;
+    private final LifecycleTransaction txn;
 
-    protected SimpleSSTableMultiWriter(SSTableWriter writer)
+    protected SimpleSSTableMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
     {
+        this.txn = txn;
         this.writer = writer;
     }
 
@@ -90,6 +92,7 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
 
     public Throwable abort(Throwable accumulate)
     {
+        txn.untrackNew(writer);
         return writer.abort(accumulate);
     }
 
@@ -114,6 +117,6 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
                                             LifecycleTransaction txn)
     {
         SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
-        return new SimpleSSTableMultiWriter(writer);
+        return new SimpleSSTableMultiWriter(writer, txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
new file mode 100644
index 0000000..674ed7f
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/RangeAwareSSTableWriter.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.service.StorageService;
+
+public class RangeAwareSSTableWriter implements SSTableMultiWriter
+{
+    private final List<PartitionPosition> boundaries;
+    private final Directories.DataDirectory[] directories;
+    private final int sstableLevel;
+    private final long estimatedKeys;
+    private final long repairedAt;
+    private final SSTableFormat.Type format;
+    private final SerializationHeader.Component header;
+    private final LifecycleTransaction txn;
+    private int currentIndex = -1;
+    public final ColumnFamilyStore cfs;
+    private final List<SSTableMultiWriter> finishedWriters = new ArrayList<>();
+    private final List<SSTableReader> finishedReaders = new ArrayList<>();
+    private SSTableMultiWriter currentWriter = null;
+
+    public RangeAwareSSTableWriter(ColumnFamilyStore cfs, long estimatedKeys, long repairedAt, SSTableFormat.Type format, int sstableLevel, long totalSize, LifecycleTransaction txn, SerializationHeader.Component header) throws IOException
+    {
+        directories = cfs.getDirectories().getWriteableLocations();
+        this.sstableLevel = sstableLevel;
+        this.cfs = cfs;
+        this.estimatedKeys = estimatedKeys / directories.length;
+        this.repairedAt = repairedAt;
+        this.format = format;
+        this.txn = txn;
+        this.header = header;
+        boundaries = StorageService.getDiskBoundaries(cfs, directories);
+        if (boundaries == null)
+        {
+            Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
+            if (localDir == null)
+                throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
+        }
+    }
+
+    private void maybeSwitchWriter(DecoratedKey key)
+    {
+        if (boundaries == null)
+            return;
+
+        boolean switched = false;
+        while (currentIndex < 0 || key.compareTo(boundaries.get(currentIndex)) > 0)
+        {
+            switched = true;
+            currentIndex++;
+        }
+
+        if (switched)
+        {
+            if (currentWriter != null)
+                finishedWriters.add(currentWriter);
+
+            Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(directories[currentIndex])), format);
+            currentWriter = cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), txn);
+        }
+    }
+
+    public boolean append(UnfilteredRowIterator partition)
+    {
+        maybeSwitchWriter(partition.partitionKey());
+        return currentWriter.append(partition);
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter writer : finishedWriters)
+        {
+            if (writer.getFilePointer() > 0)
+                finishedReaders.addAll(writer.finish(repairedAt, maxDataAge, openResult));
+            else
+                SSTableMultiWriter.abortOrDie(writer);
+        }
+        return finishedReaders;
+    }
+
+    @Override
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter writer : finishedWriters)
+        {
+            if (writer.getFilePointer() > 0)
+                finishedReaders.addAll(writer.finish(openResult));
+            else
+                SSTableMultiWriter.abortOrDie(writer);
+        }
+        return finishedReaders;
+    }
+
+    @Override
+    public Collection<SSTableReader> finished()
+    {
+        return finishedReaders;
+    }
+
+    @Override
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        finishedWriters.forEach((w) -> w.setOpenResult(openResult));
+        currentWriter.setOpenResult(openResult);
+        return this;
+    }
+
+    public String getFilename()
+    {
+        return String.join("/", cfs.keyspace.getName(), cfs.getTableName());
+    }
+
+    @Override
+    public long getFilePointer()
+    {
+        return currentWriter.getFilePointer();
+    }
+
+    @Override
+    public UUID getCfId()
+    {
+        return currentWriter.getCfId();
+    }
+
+    @Override
+    public Throwable commit(Throwable accumulate)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter writer : finishedWriters)
+            accumulate = writer.commit(accumulate);
+        return accumulate;
+    }
+
+    @Override
+    public Throwable abort(Throwable accumulate)
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        for (SSTableMultiWriter finishedWriter : finishedWriters)
+            accumulate = finishedWriter.abort(accumulate);
+
+        return accumulate;
+    }
+
+    @Override
+    public void prepareToCommit()
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        finishedWriters.forEach(SSTableMultiWriter::prepareToCommit);
+    }
+
+    @Override
+    public void close()
+    {
+        if (currentWriter != null)
+            finishedWriters.add(currentWriter);
+        currentWriter = null;
+        finishedWriters.forEach(SSTableMultiWriter::close);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
index 925efd6..a083218 100644
--- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
+++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
@@ -24,7 +24,12 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
 {
     protected Directories.DataDirectory getWriteDirectory(long writeSize)
     {
-        Directories.DataDirectory directory = getDirectories().getWriteableLocation(writeSize);
+        Directories.DataDirectory directory;
+        directory = getDirectory();
+
+        if (directory == null) // ok panic - write anywhere
+            directory = getDirectories().getWriteableLocation(writeSize);
+
         if (directory == null)
             throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes");
 
@@ -36,4 +41,14 @@ public abstract class DiskAwareRunnable extends WrappedRunnable
      * @return Directories instance for the CF.
      */
     protected abstract Directories getDirectories();
+    protected abstract Directories.DataDirectory getDirectory();
+
+    /**
+     * Called if no disk is available with free space for the full write size.
+     * @return true if the scope of the task was successfully reduced.
+     */
+    public boolean reduceScopeForLimitedSpace()
+    {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 60ede18..24bebae 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RangeStreamer;
 import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Splitter;
 import org.apache.cassandra.dht.StreamStateStore;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.Token.TokenFactory;
@@ -2625,6 +2626,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public int relocateSSTables(String keyspaceName, String ... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
+        for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
+        {
+            CompactionManager.AllSSTableOpStatus oneStatus = cfs.relocateSSTables();
+            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
+                status = oneStatus;
+        }
+        return status.statusCode;
+    }
+
     /**
      * Takes the snapshot for the given keyspaces. A snapshot name must be specified.
      *
@@ -4459,4 +4472,61 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         logger.info(String.format("Updated hinted_handoff_throttle_in_kb to %d", throttleInKB));
     }
 
+    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return null;
+
+        Collection<Range<Token>> lr;
+
+        if (StorageService.instance.isBootstrapMode())
+        {
+            lr = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress());
+        }
+        else
+        {
+            // Reason we use use the future settled TMD is that if we decommission a node, we want to stream
+            // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places.
+            // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled
+            TokenMetadata tmd = StorageService.instance.getTokenMetadata().cloneAfterAllSettled();
+            lr = cfs.keyspace.getReplicationStrategy().getAddressRanges(tmd).get(FBUtilities.getBroadcastAddress());
+        }
+
+        if (lr == null || lr.isEmpty())
+            return null;
+        List<Range<Token>> localRanges = Range.sort(lr);
+
+        return getDiskBoundaries(localRanges, cfs.getPartitioner(), directories);
+    }
+
+    public static List<PartitionPosition> getDiskBoundaries(ColumnFamilyStore cfs)
+    {
+        return getDiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations());
+    }
+
+    /**
+     * Returns a list of disk boundaries, the result will differ depending on whether vnodes are enabled or not.
+     *
+     * What is returned are upper bounds for the disks, meaning everything from partitioner.minToken up to
+     * getDiskBoundaries(..).get(0) should be on the first disk, everything between 0 to 1 should be on the second disk
+     * etc.
+     *
+     * The final entry in the returned list will always be the partitioner maximum tokens upper key bound
+     *
+     * @param localRanges
+     * @param partitioner
+     * @param dataDirectories
+     * @return
+     */
+    public static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> localRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories)
+    {
+        assert partitioner.splitter().isPresent();
+        Splitter splitter = partitioner.splitter().get();
+        List<Token> boundaries = splitter.splitOwnedRanges(dataDirectories.length, localRanges, DatabaseDescriptor.getNumTokens() > 1);
+        List<PartitionPosition> diskBoundaries = new ArrayList<>();
+        for (int i = 0; i < boundaries.size() - 1; i++)
+            diskBoundaries.add(boundaries.get(i).maxKeyBound());
+        diskBoundaries.add(partitioner.getMaximumToken().maxKeyBound());
+        return diskBoundaries;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 70691c7..eef34c0 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -248,6 +248,7 @@ public interface StorageServiceMBean extends NotificationEmitter
      */
     public void forceKeyspaceCompaction(boolean splitOutput, String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException;
 
+    public int relocateSSTables(String keyspace, String ... cfnames) throws IOException, ExecutionException, InterruptedException;
     /**
      * Trigger a cleanup of keys on a single keyspace
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 87dcda0..61eb13f 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -35,9 +35,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -63,8 +63,6 @@ public class StreamReader
     protected final int sstableLevel;
     protected final SerializationHeader.Component header;
 
-    protected Descriptor desc;
-
     public StreamReader(FileMessageHeader header, StreamSession session)
     {
         this.session = session;
@@ -108,7 +106,7 @@ public class StreamReader
             {
                 writePartition(deserializer, writer);
                 // TODO move this to BytesReadTracker
-                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+                session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
             }
             return writer;
         }
@@ -132,10 +130,7 @@ public class StreamReader
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
 
-        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
-
-
-        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+        return new RangeAwareSSTableWriter(cfs, estimatedKeys, repairedAt, format, sstableLevel, totalSize, session.getTransaction(cfId), header);
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 5355c3e..9078acc 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -584,9 +584,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         receivers.get(message.header.cfId).received(message.sstable);
     }
 
-    public void progress(Descriptor desc, ProgressInfo.Direction direction, long bytes, long total)
+    public void progress(String filename, ProgressInfo.Direction direction, long bytes, long total)
     {
-        ProgressInfo progress = new ProgressInfo(peer, index, desc.filenameFor(Component.DATA), direction, bytes, total);
+        ProgressInfo progress = new ProgressInfo(peer, index, filename, direction, bytes, total);
         streamResult.handleProgress(progress);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index 106677c..ca35c0b 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -102,7 +102,7 @@ public class StreamWriter
                     long lastBytesRead = write(file, validator, readOffset, length, bytesRead);
                     bytesRead += lastBytesRead;
                     progress += (lastBytesRead - readOffset);
-                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
                     readOffset = 0;
                 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 4d10244..c123102 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,6 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,7 +96,7 @@ public class CompressedStreamReader extends StreamReader
                 {
                     writePartition(deserializer, writer);
                     // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred
-                    session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
+                    session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                 }
             }
             return writer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index adbd091..93e0f76 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -26,6 +26,7 @@ import java.util.List;
 import com.google.common.base.Function;
 
 import org.apache.cassandra.io.compress.CompressionMetadata;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -75,7 +76,7 @@ public class CompressedStreamWriter extends StreamWriter
                     long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
                     bytesTransferred += lastWrite;
                     progress += lastWrite;
-                    session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize);
+                    session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1078004..6f1c753 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -299,6 +299,11 @@ public class NodeProbe implements AutoCloseable
         ssProxy.forceKeyspaceCompaction(splitOutput, keyspaceName, tableNames);
     }
 
+    public void relocateSSTables(String keyspace, String[] cfnames) throws IOException, ExecutionException, InterruptedException
+    {
+        ssProxy.relocateSSTables(keyspace, cfnames);
+    }
+
     public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
     {
         ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 668c075..9728356 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -134,7 +134,8 @@ public class NodeTool
                 DisableHintsForDC.class,
                 EnableHintsForDC.class,
                 FailureDetectorInfo.class,
-                RefreshSizeEstimates.class
+                RefreshSizeEstimates.class,
+                RelocateSSTables.class
         );
 
         Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index b27b07a..b62512a 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -17,18 +17,20 @@
  */
 package org.apache.cassandra.tools;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -96,7 +98,7 @@ public class SSTableOfflineRelevel
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
         Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
-        Set<SSTableReader> sstables = new HashSet<>();
+        SetMultimap<File, SSTableReader> sstableMultimap = HashMultimap.create();
         for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
         {
             if (sstable.getKey() != null)
@@ -104,7 +106,7 @@ public class SSTableOfflineRelevel
                 try
                 {
                     SSTableReader reader = SSTableReader.open(sstable.getKey());
-                    sstables.add(reader);
+                    sstableMultimap.put(reader.descriptor.directory, reader);
                 }
                 catch (Throwable t)
                 {
@@ -113,13 +115,20 @@ public class SSTableOfflineRelevel
                 }
             }
         }
-        if (sstables.isEmpty())
+        if (sstableMultimap.isEmpty())
         {
             out.println("No sstables to relevel for "+keyspace+"."+columnfamily);
             System.exit(1);
         }
-        Relevel rl = new Relevel(sstables);
-        rl.relevel(dryRun);
+        for (File directory : sstableMultimap.keySet())
+        {
+            if (!sstableMultimap.get(directory).isEmpty())
+            {
+                Relevel rl = new Relevel(sstableMultimap.get(directory));
+                out.println("For sstables in " + directory + ":");
+                rl.relevel(dryRun);
+            }
+        }
         System.exit(0);
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
new file mode 100644
index 0000000..8522bc4
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.command.Arguments;
+import io.airlift.command.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+@Command(name = "relocatesstables", description = "Relocates sstables to the correct disk")
+public class RelocateSSTables extends NodeTool.NodeToolCmd
+{
+    @Arguments(usage = "<keyspace> <table>", description = "The keyspace and table name")
+    private List<String> args = new ArrayList<>();
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        List<String> keyspaces = parseOptionalKeyspace(args, probe);
+        String[] cfnames = parseOptionalTables(args);
+        try
+        {
+            for (String keyspace : keyspaces)
+                probe.relocateSSTables(keyspace, cfnames);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Got error while relocating", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 96ee072..0f05524 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -59,8 +59,8 @@ public class LongLeveledCompactionStrategyTest
         Keyspace keyspace = Keyspace.open(ksname);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname);
         store.disableAutoCompaction();
-
-        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy)store.getCompactionStrategyManager().getStrategies().get(1);
+        CompactionStrategyManager mgr = store.getCompactionStrategyManager();
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) mgr.getStrategies().get(1).get(0);
 
         ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it easy to have multiple files
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 27b774d..824c533 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -633,14 +633,14 @@ public class ScrubTest
     {
         SerializationHeader header = new SerializationHeader(true, metadata, metadata.partitionColumns(), EncodingStats.NO_STATS);
         MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(0);
-        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn));
+        return new TestMultiWriter(new TestWriter(descriptor, keyCount, 0, metadata, collector, header, txn), txn);
     }
 
     private static class TestMultiWriter extends SimpleSSTableMultiWriter
     {
-        TestMultiWriter(SSTableWriter writer)
+        TestMultiWriter(SSTableWriter writer, LifecycleTransaction txn)
         {
-            super(writer);
+            super(writer, txn);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 3c0098b..7fee251 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.junit.Test;
@@ -215,9 +216,9 @@ public class CompactionsCQLTest extends CQLTester
     public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
     {
         boolean found = false;
-        for (AbstractCompactionStrategy actualStrategy : manager.getStrategies())
+        for (List<AbstractCompactionStrategy> strategies : manager.getStrategies())
         {
-            if (!actualStrategy.getClass().equals(expected))
+            if (!strategies.stream().allMatch((strategy) -> strategy.getClass().equals(expected)))
                 return false;
             found = true;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index f2ddb00..1676896 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -122,10 +123,11 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
         // Checking we're not completely bad at math
-        int l1Count = strategy.getSSTableCountPerLevel()[1];
-        int l2Count = strategy.getSSTableCountPerLevel()[2];
+
+        int l1Count = strategyManager.getSSTableCountPerLevel()[1];
+        int l2Count = strategyManager.getSSTableCountPerLevel()[2];
         if (l1Count == 0 || l2Count == 0)
         {
             logger.error("L1 or L2 has 0 sstables. Expected > 0 on both.");
@@ -177,10 +179,10 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        CompactionStrategyManager strategyManager = cfs.getCompactionStrategyManager();
         // Checking we're not completely bad at math
-        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
-        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+        assertTrue(strategyManager.getSSTableCountPerLevel()[1] > 0);
+        assertTrue(strategyManager.getSSTableCountPerLevel()[2] > 0);
 
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
@@ -196,7 +198,7 @@ public class LeveledCompactionStrategyTest
      */
     private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException
     {
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
+        CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
         // L0 is the lowest priority, so when that's done, we know everything is done
         while (strategy.getSSTableCountPerLevel()[0] > 1)
             Thread.sleep(100);
@@ -224,7 +226,7 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
         assert strategy.getLevelSize(1) > 0;
 
         // get LeveledScanner for level 1 sstables
@@ -260,7 +262,7 @@ public class LeveledCompactionStrategyTest
             cfs.forceBlockingFlush();
         }
         cfs.forceBlockingFlush();
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategyManager().getStrategies().get(1).get(0);
         cfs.forceMajorCompaction();
 
         for (SSTableReader s : cfs.getLiveSSTables())
@@ -306,14 +308,14 @@ public class LeveledCompactionStrategyTest
         while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
             Thread.sleep(100);
 
-        CompactionStrategyManager strategy =  cfs.getCompactionStrategyManager();
-        List<AbstractCompactionStrategy> strategies = strategy.getStrategies();
-        LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0);
-        LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1);
+        CompactionStrategyManager manager = cfs.getCompactionStrategyManager();
+        List<List<AbstractCompactionStrategy>> strategies = manager.getStrategies();
+        LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0).get(0);
+        LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1).get(0);
         assertEquals(0, repaired.manifest.getLevelCount() );
         assertEquals(2, unrepaired.manifest.getLevelCount());
-        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
-        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
+        assertTrue(manager.getSSTableCountPerLevel()[1] > 0);
+        assertTrue(manager.getSSTableCountPerLevel()[2] > 0);
 
         for (SSTableReader sstable : cfs.getLiveSSTables())
             assertFalse(sstable.isRepaired());
@@ -331,7 +333,7 @@ public class LeveledCompactionStrategyTest
         sstable1.reloadSSTableMetadata();
         assertTrue(sstable1.isRepaired());
 
-        strategy.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
+        manager.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
 
         int repairedSSTableCount = 0;
         for (List<SSTableReader> level : repaired.manifest.generations)
@@ -343,7 +345,7 @@ public class LeveledCompactionStrategyTest
         assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
 
         unrepaired.removeSSTable(sstable2);
-        strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this);
+        manager.handleNotification(new SSTableAddedNotification(singleton(sstable2)), this);
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 7b9b19c..4f49389 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -297,7 +297,7 @@ public class TrackerTest
         Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
 
         SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
-        tracker.replaceFlushed(prev2, Collections.singleton(reader));
+        tracker.replaceFlushed(prev2, singleton(reader));
         Assert.assertEquals(1, tracker.getView().sstables.size());
         Assert.assertEquals(1, listener.received.size());
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
@@ -314,13 +314,13 @@ public class TrackerTest
         tracker.markFlushing(prev1);
         reader = MockSchema.sstable(0, 10, true, cfs);
         cfs.invalidate(false);
-        tracker.replaceFlushed(prev1, Collections.singleton(reader));
+        tracker.replaceFlushed(prev1, singleton(reader));
         Assert.assertEquals(0, tracker.getView().sstables.size());
         Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
         Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
         Assert.assertEquals(3, listener.received.size());
         Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
-        Assert.assertTrue(listener.received.get(1) instanceof  SSTableDeletingNotification);
+        Assert.assertTrue(listener.received.get(1) instanceof SSTableDeletingNotification);
         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 523c203..e787cc4 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import static com.google.common.collect.ImmutableSet.copyOf;
 import static com.google.common.collect.ImmutableSet.of;
 import static com.google.common.collect.Iterables.concat;
+import static java.util.Collections.singleton;
 import static org.apache.cassandra.db.lifecycle.Helpers.emptySet;
 
 public class ViewTest
@@ -195,7 +196,7 @@ public class ViewTest
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());
 
         SSTableReader sstable = MockSchema.sstable(1, cfs);
-        cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur);
+        cur = View.replaceFlushed(memtable1, singleton(sstable)).apply(cur);
         Assert.assertEquals(0, cur.flushingMemtables.size());
         Assert.assertEquals(1, cur.liveMemtables.size());
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
index 9cefbf2..e2202fe 100644
--- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
+++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java
@@ -61,6 +61,12 @@ public class LengthPartitioner implements IPartitioner
         return MINIMUM;
     }
 
+    @Override
+    public Token getMaximumToken()
+    {
+        return null;
+    }
+
     public BigIntegerToken getRandomToken()
     {
         return new BigIntegerToken(BigInteger.valueOf(new Random().nextInt(15)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/test/unit/org/apache/cassandra/dht/SplitterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java
new file mode 100644
index 0000000..751a7d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SplitterTest
+{
+
+    @Test
+    public void randomSplitTestNoVNodesRandomPartitioner()
+    {
+        randomSplitTestNoVNodes(new RandomPartitioner());
+    }
+
+    @Test
+    public void randomSplitTestNoVNodesMurmur3Partitioner()
+    {
+        randomSplitTestNoVNodes(new Murmur3Partitioner());
+    }
+
+    @Test
+    public void randomSplitTestVNodesRandomPartitioner()
+    {
+        randomSplitTestVNodes(new RandomPartitioner());
+    }
+    @Test
+    public void randomSplitTestVNodesMurmur3Partitioner()
+    {
+        randomSplitTestVNodes(new Murmur3Partitioner());
+    }
+
+    public void randomSplitTestNoVNodes(IPartitioner partitioner)
+    {
+        Splitter splitter = partitioner.splitter().get();
+        Random r = new Random();
+        for (int i = 0; i < 10000; i++)
+        {
+            List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4)+1, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false);
+            assertTrue("boundaries = "+boundaries+" ranges = "+localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true));
+        }
+    }
+
+    public void randomSplitTestVNodes(IPartitioner partitioner)
+    {
+        Splitter splitter = partitioner.splitter().get();
+        Random r = new Random();
+        for (int i = 0; i < 10000; i++)
+        {
+            // we need many tokens to be able to split evenly over the disks
+            int numTokens = 172 + r.nextInt(128);
+            int rf = r.nextInt(4) + 2;
+            int parts = r.nextInt(5)+1;
+            List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner);
+            List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true);
+            if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false))
+                fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries));
+        }
+    }
+
+    private boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges)
+    {
+        Token start = partitioner.getMinimumToken();
+        List<BigInteger> splits = new ArrayList<>();
+
+        for (int i = 0; i < tokens.size(); i++)
+        {
+            Token end = i == tokens.size() - 1 ? partitioner.getMaximumToken() : tokens.get(i);
+            splits.add(sumOwnedBetween(localRanges, start, end, splitter, splitIndividualRanges));
+            start = end;
+        }
+        // when we dont need to keep around full ranges, the difference is small between the partitions
+        BigDecimal delta = splitIndividualRanges ? BigDecimal.valueOf(0.001) : BigDecimal.valueOf(0.2);
+        boolean allBalanced = true;
+        for (BigInteger b : splits)
+        {
+            for (BigInteger i : splits)
+            {
+                BigDecimal bdb = new BigDecimal(b);
+                BigDecimal bdi = new BigDecimal(i);
+                BigDecimal q = bdb.divide(bdi, 2, BigDecimal.ROUND_HALF_DOWN);
+                if (q.compareTo(BigDecimal.ONE.add(delta)) > 0 || q.compareTo(BigDecimal.ONE.subtract(delta)) < 0)
+                    allBalanced = false;
+            }
+        }
+        return allBalanced;
+    }
+
+    private BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges)
+    {
+        BigInteger sum = BigInteger.ZERO;
+        for (Range<Token> range : localRanges)
+        {
+            if (splitIndividualRanges)
+            {
+                Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range);
+                for (Range<Token> intersection : intersections)
+                    sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left)));
+            }
+            else
+            {
+                if (new Range<>(start, end).contains(range.left))
+                    sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left)));
+            }
+        }
+        return sum;
+    }
+
+    private List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner)
+    {
+        int localTokens = numTokens * rf;
+        List<Token> randomTokens = new ArrayList<>();
+
+        for (int i = 0; i < localTokens * 2; i++)
+        {
+            Token t = splitter.tokenForValue(randomPartitioner ? new BigInteger(127, r) : BigInteger.valueOf(r.nextLong()));
+            randomTokens.add(t);
+        }
+
+        Collections.sort(randomTokens);
+
+        List<Range<Token>> localRanges = new ArrayList<>(localTokens);
+        for (int i = 0; i < randomTokens.size() - 1; i++)
+        {
+            assert randomTokens.get(i).compareTo(randomTokens.get(i+1)) < 0;
+            localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i+1)));
+            i++;
+        }
+        return localRanges;
+    }
+}


[2/2] cassandra git commit: Make sure the same token does not exist in several data directories

Posted by ma...@apache.org.
Make sure the same token does not exist in several data directories

Patch by marcuse; reviewed by Yuki Morishita and Carl Yeksigian for CASSANDRA-6696


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2c63418
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2c63418
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2c63418

Branch: refs/heads/cassandra-3.2
Commit: e2c6341898fa43b0e262ef031f267587050b8d0f
Parents: 1568293
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Aug 3 10:12:47 2015 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jan 5 16:48:50 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   5 +
 conf/cassandra.yaml                             |  13 +-
 .../org/apache/cassandra/config/Config.java     |   2 +-
 .../cassandra/config/DatabaseDescriptor.java    |   3 -
 .../cassandra/db/BlacklistedDirectories.java    |  10 +
 .../db/BlacklistedDirectoriesMBean.java         |   9 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  97 ++++-
 .../org/apache/cassandra/db/Directories.java    |  36 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 193 +++++-----
 .../db/compaction/CompactionManager.java        |  78 +++-
 .../compaction/CompactionStrategyManager.java   | 361 +++++++++++++------
 .../cassandra/db/compaction/CompactionTask.java |   2 +-
 .../DateTieredCompactionStrategy.java           |   3 +-
 .../db/compaction/LeveledManifest.java          |   2 +-
 .../cassandra/db/compaction/OperationType.java  |   3 +-
 .../cassandra/db/compaction/Scrubber.java       |   7 +-
 .../SizeTieredCompactionStrategy.java           |   9 +-
 .../writers/CompactionAwareWriter.java          |  91 ++++-
 .../writers/DefaultCompactionWriter.java        |  17 +-
 .../writers/MajorLeveledCompactionWriter.java   |  53 +--
 .../writers/MaxSSTableSizeWriter.java           |  22 +-
 .../SplittingSizeTieredCompactionWriter.java    |  17 +-
 .../apache/cassandra/db/lifecycle/Tracker.java  |   4 +-
 .../org/apache/cassandra/db/lifecycle/View.java |   2 +-
 .../org/apache/cassandra/dht/IPartitioner.java  |  17 +
 .../cassandra/dht/Murmur3Partitioner.java       |  23 ++
 .../apache/cassandra/dht/RandomPartitioner.java |  24 ++
 src/java/org/apache/cassandra/dht/Range.java    |  17 +
 src/java/org/apache/cassandra/dht/Splitter.java | 124 +++++++
 .../io/sstable/SimpleSSTableMultiWriter.java    |   7 +-
 .../sstable/format/RangeAwareSSTableWriter.java | 205 +++++++++++
 .../cassandra/io/util/DiskAwareRunnable.java    |  17 +-
 .../cassandra/service/StorageService.java       |  70 ++++
 .../cassandra/service/StorageServiceMBean.java  |   1 +
 .../cassandra/streaming/StreamReader.java       |  11 +-
 .../cassandra/streaming/StreamSession.java      |   4 +-
 .../cassandra/streaming/StreamWriter.java       |   2 +-
 .../compress/CompressedStreamReader.java        |   3 +-
 .../compress/CompressedStreamWriter.java        |   3 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |   5 +
 .../org/apache/cassandra/tools/NodeTool.java    |   3 +-
 .../cassandra/tools/SSTableOfflineRelevel.java  |  21 +-
 .../tools/nodetool/RelocateSSTables.java        |  49 +++
 .../LongLeveledCompactionStrategyTest.java      |   4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   6 +-
 .../db/compaction/CompactionsCQLTest.java       |   5 +-
 .../LeveledCompactionStrategyTest.java          |  36 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |   6 +-
 .../apache/cassandra/db/lifecycle/ViewTest.java |   3 +-
 .../apache/cassandra/dht/LengthPartitioner.java |   6 +
 .../org/apache/cassandra/dht/SplitterTest.java  | 158 ++++++++
 52 files changed, 1509 insertions(+), 361 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9c3a50f..43acd43 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
  * Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
  * Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
  * Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 1269c98..f5f50c1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,11 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - We now make sure that a token does not exist in several data directories. This
+     means that we run one compaction strategy per data_file_directory and we use
+     one thread per directory to flush. Use nodetool relocatesstables to make sure your
+     tokens are in the correct place, or just wait and compaction will handle it. See
+     CASSANDRA-6696 for more details.
    - bound maximum in-flight commit log replay mutation bytes to 64 megabytes
      tunable via cassandra.commitlog_max_outstanding_replay_bytes
    - Support for type casting has been added to the selection clause.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 74e1d1d..a8a90ba 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -413,14 +413,13 @@ memtable_allocation_type: heap_buffers
 
 # This sets the amount of memtable flush writer threads.  These will
 # be blocked by disk io, and each one will hold a memtable in memory
-# while blocked. 
+# while blocked.
 #
-# memtable_flush_writers defaults to the smaller of (number of disks,
-# number of cores), with a minimum of 2 and a maximum of 8.
-# 
-# If your data directories are backed by SSD, you should increase this
-# to the number of cores.
-#memtable_flush_writers: 8
+# memtable_flush_writers defaults to one per data_file_directory.
+#
+# If your data directories are backed by SSD, you can increase this, but
+# avoid having memtable_flush_writers * data_file_directories > number of cores
+#memtable_flush_writers: 1
 
 # A fixed memory pool size in MB for for SSTable index summaries. If left
 # empty, this will default to 5% of the heap size. If the memory usage of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2bace5e..54cb089 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -98,7 +98,7 @@ public class Config
     @Deprecated
     public Integer concurrent_replicates = null;
 
-    public Integer memtable_flush_writers = null;
+    public Integer memtable_flush_writers = 1;
     public Integer memtable_heap_space_in_mb;
     public Integer memtable_offheap_space_in_mb;
     public Float memtable_cleanup_threshold = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3fc0b31..c82e930 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -582,9 +582,6 @@ public class DatabaseDescriptor
         if (conf.hints_directory.equals(conf.saved_caches_directory))
             throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false);
 
-        if (conf.memtable_flush_writers == null)
-            conf.memtable_flush_writers = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
-
         if (conf.memtable_flush_writers < 1)
             throw new ConfigurationException("memtable_flush_writers must be at least 1, but was " + conf.memtable_flush_writers, false);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
index f47fd57..3e6332c 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectories.java
@@ -66,6 +66,16 @@ public class BlacklistedDirectories implements BlacklistedDirectoriesMBean
         return Collections.unmodifiableSet(unwritableDirectories);
     }
 
+    public void markUnreadable(String path)
+    {
+        maybeMarkUnreadable(new File(path));
+    }
+
+    public void markUnwritable(String path)
+    {
+        maybeMarkUnwritable(new File(path));
+    }
+
     /**
      * Adds parent directory of the file (or the file itself, if it is a directory)
      * to the set of unreadable directories.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
index 3163b9a..3fb9f39 100644
--- a/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
+++ b/src/java/org/apache/cassandra/db/BlacklistedDirectoriesMBean.java
@@ -20,10 +20,13 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.util.Set;
 
-public interface BlacklistedDirectoriesMBean {
-
+public interface BlacklistedDirectoriesMBean
+{
     public Set<File> getUnreadableDirectories();
     
     public Set<File> getUnwritableDirectories();
-    
+
+    public void markUnreadable(String path);
+
+    public void markUnwritable(String path);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 738c941..6eefb5f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -118,13 +118,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
                                                                                           new NamedThreadFactory("MemtableFlushWriter"),
                                                                                           "internal");
 
+    private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
+    static
+    {
+        for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++)
+        {
+            perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+                                                                        StageManager.KEEPALIVE,
+                                                                        TimeUnit.SECONDS,
+                                                                        new LinkedBlockingQueue<Runnable>(),
+                                                                        new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
+                                                                        "internal");
+        }
+    }
+
     // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
     private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                               StageManager.KEEPALIVE,
@@ -458,7 +472,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Directories getDirectories()
     {
-        return directories;
+        // todo, hack since we need to know the data directories when constructing the compaction strategy
+        if (directories != null)
+            return directories;
+        return new Directories(metadata, initialDirectories);
     }
 
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
@@ -1033,11 +1050,74 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             for (Memtable memtable : memtables)
             {
-                // flush the memtable
-                MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+                List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+                long totalBytesOnDisk = 0;
+                long maxBytesOnDisk = 0;
+                long minBytesOnDisk = Long.MAX_VALUE;
+                List<SSTableReader> sstables = new ArrayList<>();
+                try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
+                {
+                    // flush the memtable
+                    List<Memtable.FlushRunnable> flushRunnables = memtable.flushRunnables(txn);
+
+                    for (int i = 0; i < flushRunnables.size(); i++)
+                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
+                    List<SSTableMultiWriter> flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
+
+                    try
+                    {
+                        Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+                        while (writerIterator.hasNext())
+                        {
+                            SSTableMultiWriter writer = writerIterator.next();
+                            if (writer.getFilePointer() > 0)
+                            {
+                                writer.setOpenResult(true).prepareToCommit();
+                            }
+                            else
+                            {
+                                maybeFail(writer.abort(null));
+                                writerIterator.remove();
+                            }
+                        }
+                    }
+                    catch (Throwable t)
+                    {
+                        for (SSTableMultiWriter writer : flushResults)
+                            t = writer.abort(t);
+                        t = txn.abort(t);
+                        Throwables.propagate(t);
+                    }
+
+                    txn.prepareToCommit();
+
+                    Throwable accumulate = null;
+                    for (SSTableMultiWriter writer : flushResults)
+                        accumulate = writer.commit(accumulate);
+
+                    maybeFail(txn.commit(accumulate));
+
+                    for (SSTableMultiWriter writer : flushResults)
+                    {
+                        Collection<SSTableReader> flushedSSTables = writer.finished();
+                        for (SSTableReader sstable : flushedSSTables)
+                        {
+                            if (sstable != null)
+                            {
+                                sstables.add(sstable);
+                                long size = sstable.bytesOnDisk();
+                                totalBytesOnDisk += size;
+                                maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+                                minBytesOnDisk = Math.min(minBytesOnDisk, size);
+                            }
+                        }
+                    }
+                }
+                memtable.cfs.replaceFlushed(memtable, sstables);
                 reclaim(memtable);
+                logger.debug("Flushed to {} ({} sstables, {} bytes), biggest {} bytes, smallest {} bytes", sstables, sstables.size(), totalBytesOnDisk, maxBytesOnDisk, minBytesOnDisk);
             }
-
             // signal the post-flush we've done our work
             postFlush.latch.countDown();
         }
@@ -1366,6 +1446,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
     }
 
+    public CompactionManager.AllSSTableOpStatus relocateSSTables() throws ExecutionException, InterruptedException
+    {
+        return CompactionManager.instance.relocateSSTables(this);
+    }
+
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)
     {
         assert !sstables.isEmpty();
@@ -2206,7 +2291,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getUnleveledSSTables()
     {
-        return this.compactionStrategyManager.getUnleveledSSTables();
+        return compactionStrategyManager.getUnleveledSSTables();
     }
 
     public int[] getSSTableCountPerLevel()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8744d43..a572bed 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -289,6 +290,19 @@ public class Directories
         return null;
     }
 
+    public DataDirectory getDataDirectoryForFile(File directory)
+    {
+        if (directory != null)
+        {
+            for (DataDirectory dataDirectory : dataDirectories)
+            {
+                if (directory.getAbsolutePath().startsWith(dataDirectory.location.getAbsolutePath()))
+                    return dataDirectory;
+            }
+        }
+        return null;
+    }
+
     public Descriptor find(String filename)
     {
         for (File dir : dataPaths)
@@ -403,7 +417,7 @@ public class Directories
         for (DataDirectory dataDir : paths)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
-                  continue;
+                continue;
             DataDirectoryCandidate candidate = new DataDirectoryCandidate(dataDir);
             // exclude directory if its total writeSize does not fit to data directory
             if (candidate.availableSpace < writeSize)
@@ -413,6 +427,26 @@ public class Directories
         return totalAvailable > expectedTotalWriteSize;
     }
 
+    public DataDirectory[] getWriteableLocations()
+    {
+        List<DataDirectory> nonBlacklistedDirs = new ArrayList<>();
+        for (DataDirectory dir : dataDirectories)
+        {
+            if (!BlacklistedDirectories.isUnwritable(dir.location))
+                nonBlacklistedDirs.add(dir);
+        }
+
+        Collections.sort(nonBlacklistedDirs, new Comparator<DataDirectory>()
+        {
+            @Override
+            public int compare(DataDirectory o1, DataDirectory o2)
+            {
+                return o1.location.compareTo(o2.location);
+            }
+        });
+        return nonBlacklistedDirs.toArray(new DataDirectory[nonBlacklistedDirs.size()]);
+    }
+
     public static File getSnapshotDirectory(Descriptor desc, String snapshotName)
     {
         return getSnapshotDirectory(desc.directory, snapshotName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 96b1775..8e7a43c 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.File;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -45,14 +44,14 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.MemtablePool;
@@ -250,9 +249,32 @@ public class Memtable implements Comparable<Memtable>
         return partitions.size();
     }
 
-    public FlushRunnable flushRunnable()
+    public List<FlushRunnable> flushRunnables(LifecycleTransaction txn)
     {
-        return new FlushRunnable(lastReplayPosition.get());
+        List<Range<Token>> localRanges = Range.sort(StorageService.instance.getLocalRanges(cfs.keyspace.getName()));
+
+        if (!cfs.getPartitioner().splitter().isPresent() || localRanges.isEmpty())
+            return Collections.singletonList(new FlushRunnable(lastReplayPosition.get(), txn));
+
+        return createFlushRunnables(localRanges, txn);
+    }
+
+    private List<FlushRunnable> createFlushRunnables(List<Range<Token>> localRanges, LifecycleTransaction txn)
+    {
+        assert cfs.getPartitioner().splitter().isPresent();
+
+        Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+        List<FlushRunnable> runnables = new ArrayList<>(boundaries.size());
+        PartitionPosition rangeStart = cfs.getPartitioner().getMinimumToken().minKeyBound();
+        ReplayPosition context = lastReplayPosition.get();
+        for (int i = 0; i < boundaries.size(); i++)
+        {
+            PartitionPosition t = boundaries.get(i);
+            runnables.add(new FlushRunnable(context, rangeStart, t, locations[i], txn));
+            rangeStart = t;
+        }
+        return runnables;
     }
 
     public String toString()
@@ -312,23 +334,41 @@ public class Memtable implements Comparable<Memtable>
         return creationTime;
     }
 
-    class FlushRunnable extends DiskAwareRunnable
+    class FlushRunnable implements Callable<SSTableMultiWriter>
     {
-        private final ReplayPosition context;
+        public final ReplayPosition context;
         private final long estimatedSize;
+        private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush;
 
         private final boolean isBatchLogTable;
+        private final SSTableMultiWriter writer;
+
+        // keeping these to be able to log what we are actually flushing
+        private final PartitionPosition from;
+        private final PartitionPosition to;
 
-        FlushRunnable(ReplayPosition context)
+        FlushRunnable(ReplayPosition context, PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn)
         {
-            this.context = context;
+            this(context, partitions.subMap(from, to), flushLocation, from, to, txn);
+        }
+
+        FlushRunnable(ReplayPosition context, LifecycleTransaction txn)
+        {
+            this(context, partitions, null, null, null, txn);
+        }
 
+        FlushRunnable(ReplayPosition context, ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
+        {
+            this.context = context;
+            this.toFlush = toFlush;
+            this.from = from;
+            this.to = to;
             long keySize = 0;
-            for (PartitionPosition key : partitions.keySet())
+            for (PartitionPosition key : toFlush.keySet())
             {
                 //  make sure we don't write non-sensical keys
                 assert key instanceof DecoratedKey;
-                keySize += ((DecoratedKey)key).getKey().remaining();
+                keySize += ((DecoratedKey) key).getKey().remaining();
             }
             estimatedSize = (long) ((keySize // index entries
                                     + keySize // keys in data file
@@ -336,21 +376,12 @@ public class Memtable implements Comparable<Memtable>
                                     * 1.2); // bloom filter and row index overhead
 
             this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
-        }
 
-        public long getExpectedWriteSize()
-        {
-            return estimatedSize;
-        }
+            if (flushLocation == null)
+                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(getDirectories().getWriteableLocation(estimatedSize))), columnsCollector.get(), statsCollector.get());
+            else
+                writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get());
 
-        protected void runMayThrow() throws Exception
-        {
-            long writeSize = getExpectedWriteSize();
-            Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
-            File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
-            assert sstableDirectory != null : "Flush task is not bound to any disk";
-            Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstables);
         }
 
         protected Directories getDirectories()
@@ -358,90 +389,64 @@ public class Memtable implements Comparable<Memtable>
             return cfs.getDirectories();
         }
 
-        private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
+        private void writeSortedContents(ReplayPosition context)
         {
-            logger.debug("Writing {}", Memtable.this.toString());
+            logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to);
 
-            Collection<SSTableReader> ssTables;
-            try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (AtomicBTreePartition partition : toFlush.values())
             {
-                boolean trackContention = logger.isTraceEnabled();
-                int heavilyContendedRowCount = 0;
-                // (we can't clear out the map as-we-go to free up memory,
-                //  since the memtable is being used for queries in the "pending flush" category)
-                for (AtomicBTreePartition partition : partitions.values())
+                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+                // we don't need to preserve tombstones for repair. So if both operation are in this
+                // memtable (which will almost always be the case if there is no ongoing failure), we can
+                // just skip the entry (CASSANDRA-4667).
+                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                    continue;
+
+                if (trackContention && partition.usePessimisticLocking())
+                    heavilyContendedRowCount++;
+
+                if (!partition.isEmpty())
                 {
-                    // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                    // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                    // we don't need to preserve tombstones for repair. So if both operation are in this
-                    // memtable (which will almost always be the case if there is no ongoing failure), we can
-                    // just skip the entry (CASSANDRA-4667).
-                    if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                        continue;
-
-                    if (trackContention && partition.usePessimisticLocking())
-                        heavilyContendedRowCount++;
-
-                    if (!partition.isEmpty())
+                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                     {
-                        try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                        {
-                            writer.append(iter);
-                        }
+                        writer.append(iter);
                     }
                 }
+            }
 
-                if (writer.getFilePointer() > 0)
-                {
-                    logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                               writer.getFilename(),
-                                               FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                               context));
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                                                              writer.getFilename(),
+                                                                              FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                                                              context));
 
-                    // sstables should contain non-repaired data.
-                    ssTables = writer.finish(true);
-                }
-                else
-                {
-                    logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                                writer.getFilename(), context);
-                    writer.abort();
-                    ssTables = null;
-                }
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, toFlush.size(), Memtable.this.toString()));
+        }
 
-                if (heavilyContendedRowCount > 0)
-                    logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        public SSTableMultiWriter createFlushWriter(LifecycleTransaction txn,
+                                                  String filename,
+                                                  PartitionColumns columns,
+                                                  EncodingStats stats)
+        {
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
+            return cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+                                                (long)toFlush.size(),
+                                                ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                sstableMetadataCollector,
+                                                new SerializationHeader(true, cfs.metadata, columns, stats), txn);
 
-                return ssTables;
-            }
         }
 
-        @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
-        public SSTableTxnWriter createFlushWriter(String filename,
-                                               PartitionColumns columns,
-                                               EncodingStats stats)
+        @Override
+        public SSTableMultiWriter call()
         {
-            // we operate "offline" here, as we expose the resulting reader consciously when done
-            // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
-            LifecycleTransaction txn = null;
-            try
-            {
-                txn = LifecycleTransaction.offline(OperationType.FLUSH);
-                MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-                return new SSTableTxnWriter(txn,
-                                            cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
-                                                                         (long) partitions.size(),
-                                                                         ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                         sstableMetadataCollector,
-                                                                         new SerializationHeader(true, cfs.metadata, columns, stats),
-                                                                         txn));
-            }
-            catch (Throwable t)
-            {
-                if (txn != null)
-                    txn.close();
-                throw t;
-            }
+            writeSortedContents(context);
+            return writer;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 571088b..1f39767 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.stream.Collectors;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
@@ -270,7 +271,7 @@ public class CompactionManager implements CompactionManagerMBean
             Iterable<SSTableReader> sstables = Lists.newArrayList(operation.filterSSTables(compacting));
             if (Iterables.isEmpty(sstables))
             {
-                logger.info("No sstables for {}.{}", cfs.keyspace.getName(), cfs.name);
+                logger.info("No sstables to {} for {}.{}", operationType.name(), cfs.keyspace.getName(), cfs.name);
                 return AllSSTableOpStatus.SUCCESSFUL;
             }
 
@@ -432,6 +433,77 @@ public class CompactionManager implements CompactionManagerMBean
         }, OperationType.CLEANUP);
     }
 
+    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs) throws ExecutionException, InterruptedException
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+        {
+            logger.info("Partitioner does not support splitting");
+            return AllSSTableOpStatus.ABORTED;
+        }
+        final Collection<Range<Token>> r = StorageService.instance.getLocalRanges(cfs.keyspace.getName());
+
+        if (r.isEmpty())
+        {
+            logger.info("Relocate cannot run before a node has joined the ring");
+            return AllSSTableOpStatus.ABORTED;
+        }
+
+        final List<Range<Token>> localRanges = Range.sort(r);
+        final Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+        final List<PartitionPosition> diskBoundaries = StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
+
+        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
+        {
+            @Override
+            public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
+            {
+                Set<SSTableReader> originals = Sets.newHashSet(transaction.originals());
+                Set<SSTableReader> needsRelocation = originals.stream().filter(s -> !inCorrectLocation(s)).collect(Collectors.toSet());
+                transaction.cancel(Sets.difference(originals, needsRelocation));
+
+                Map<Integer, List<SSTableReader>> groupedByDisk = needsRelocation.stream().collect(Collectors.groupingBy((s) ->
+                        CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), s)));
+
+                int maxSize = 0;
+                for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+                    maxSize = Math.max(maxSize, diskSSTables.size());
+
+                List<SSTableReader> mixedSSTables = new ArrayList<>();
+
+                for (int i = 0; i < maxSize; i++)
+                    for (List<SSTableReader> diskSSTables : groupedByDisk.values())
+                        if (i < diskSSTables.size())
+                            mixedSSTables.add(diskSSTables.get(i));
+
+                return mixedSSTables;
+            }
+
+            private boolean inCorrectLocation(SSTableReader sstable)
+            {
+                if (!cfs.getPartitioner().splitter().isPresent())
+                    return true;
+                int directoryIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+                Directories.DataDirectory[] locations = cfs.getDirectories().getWriteableLocations();
+
+                Directories.DataDirectory location = locations[directoryIndex];
+                PartitionPosition diskLast = diskBoundaries.get(directoryIndex);
+                // the location we get from directoryIndex is based on the first key in the sstable
+                // now we need to make sure the last key is less than the boundary as well:
+                return sstable.descriptor.directory.getAbsolutePath().startsWith(location.location.getAbsolutePath()) && sstable.last.compareTo(diskLast) <= 0;
+            }
+
+            @Override
+            public void execute(LifecycleTransaction txn) throws IOException
+            {
+                logger.debug("Relocating {}", txn.originals());
+                AbstractCompactionTask task = cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, Long.MAX_VALUE);
+                task.setUserDefined(true);
+                task.setCompactionType(OperationType.RELOCATE);
+                task.execute(metrics);
+            }
+        }, OperationType.RELOCATE);
+    }
+
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
                                           final Refs<SSTableReader> sstables,
@@ -878,9 +950,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         logger.info("Cleaning up {}", sstable);
 
-        File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
-        if (compactionFileLocation == null)
-            throw new IOException("disk full");
+        File compactionFileLocation = sstable.descriptor.directory;
 
         List<SSTableReader> finished;
         int nowInSec = FBUtilities.nowInSeconds();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 7c7e86a..067a0c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -20,9 +20,12 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.index.Index;
+import com.google.common.primitives.Ints;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
@@ -43,19 +47,21 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.service.StorageService;
 
 /**
  * Manages the compaction strategies.
  *
- * Currently has two instances of actual compaction strategies - one for repaired data and one for
+ * Currently has two instances of actual compaction strategies per data directory - one for repaired data and one for
  * unrepaired data. This is done to be able to totally separate the different sets of sstables.
  */
+
 public class CompactionStrategyManager implements INotificationConsumer
 {
     private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class);
     private final ColumnFamilyStore cfs;
-    private volatile AbstractCompactionStrategy repaired;
-    private volatile AbstractCompactionStrategy unrepaired;
+    private volatile List<AbstractCompactionStrategy> repaired = new ArrayList<>();
+    private volatile List<AbstractCompactionStrategy> unrepaired = new ArrayList<>();
     private volatile boolean enabled = true;
     public boolean isActive = true;
     private volatile CompactionParams params;
@@ -67,6 +73,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         we will use the new compaction parameters.
      */
     private CompactionParams schemaCompactionParams;
+    private Directories.DataDirectory[] locations;
 
     public CompactionStrategyManager(ColumnFamilyStore cfs)
     {
@@ -75,6 +82,7 @@ public class CompactionStrategyManager implements INotificationConsumer
         this.cfs = cfs;
         reload(cfs.metadata);
         params = cfs.metadata.params.compaction;
+        locations = getDirectories().getWriteableLocations();
         enabled = params.isEnabled();
     }
 
@@ -91,20 +99,17 @@ public class CompactionStrategyManager implements INotificationConsumer
 
         maybeReload(cfs.metadata);
 
-        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
-        {
-            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
-            if (repairedTask != null)
-                return repairedTask;
-            return unrepaired.getNextBackgroundTask(gcBefore);
-        }
-        else
+        List<AbstractCompactionStrategy> strategies = new ArrayList<>(repaired.size() + unrepaired.size());
+        strategies.addAll(repaired);
+        strategies.addAll(unrepaired);
+        Collections.sort(strategies, (o1, o2) -> Ints.compare(o2.getEstimatedRemainingTasks(), o1.getEstimatedRemainingTasks()));
+        for (AbstractCompactionStrategy strategy : strategies)
         {
-            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
-            if (unrepairedTask != null)
-                return unrepairedTask;
-            return repaired.getNextBackgroundTask(gcBefore);
+            AbstractCompactionTask task = strategy.getNextBackgroundTask(gcBefore);
+            if (task != null)
+                return task;
         }
+        return null;
     }
 
     public boolean isEnabled()
@@ -135,36 +140,78 @@ public class CompactionStrategyManager implements INotificationConsumer
             if (sstable.openReason != SSTableReader.OpenReason.EARLY)
                 getCompactionStrategyFor(sstable).addSSTable(sstable);
         }
-        repaired.startup();
-        unrepaired.startup();
+        repaired.forEach(AbstractCompactionStrategy::startup);
+        unrepaired.forEach(AbstractCompactionStrategy::startup);
     }
 
     /**
      * return the compaction strategy for the given sstable
      *
-     * returns differently based on the repaired status
+     * returns differently based on the repaired status and which vnode the compaction strategy belongs to
      * @param sstable
      * @return
      */
     private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable)
     {
+        int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
         if (sstable.isRepaired())
-            return repaired;
+            return repaired.get(index);
         else
-            return unrepaired;
+            return unrepaired.get(index);
+    }
+
+    /**
+     * Get the correct compaction strategy for the given sstable. If the first token starts within a disk boundary, we
+     * will add it to that compaction strategy.
+     *
+     * In the case we are upgrading, the first compaction strategy will get most files - we do not care about which disk
+     * the sstable is on currently (unless we don't know the local tokens yet). Once we start compacting we will write out
+     * sstables in the correct locations and give them to the correct compaction strategy instance.
+     *
+     * @param cfs
+     * @param locations
+     * @param sstable
+     * @return
+     */
+    public static int getCompactionStrategyIndex(ColumnFamilyStore cfs, Directories locations, SSTableReader sstable)
+    {
+        if (!cfs.getPartitioner().splitter().isPresent())
+            return 0;
+
+        Directories.DataDirectory[] directories = locations.getWriteableLocations();
+
+        List<PartitionPosition> boundaries = StorageService.getDiskBoundaries(cfs, locations.getWriteableLocations());
+        if (boundaries == null)
+        {
+            // try to figure out location based on sstable directory:
+            for (int i = 0; i < directories.length; i++)
+            {
+                Directories.DataDirectory directory = directories[i];
+                if (sstable.descriptor.directory.getAbsolutePath().startsWith(directory.location.getAbsolutePath()))
+                    return i;
+            }
+            return 0;
+        }
+
+        int pos = Collections.binarySearch(boundaries, sstable.first);
+        assert pos < 0; // boundaries are .minkeybound and .maxkeybound so they should never be equal
+        return -pos - 1;
+
+
     }
 
     public void shutdown()
     {
         isActive = false;
-        repaired.shutdown();
-        unrepaired.shutdown();
+        repaired.forEach(AbstractCompactionStrategy::shutdown);
+        unrepaired.forEach(AbstractCompactionStrategy::shutdown);
     }
 
     public synchronized void maybeReload(CFMetaData metadata)
     {
         // compare the old schema configuration to the new one, ignore any locally set changes.
-        if (metadata.params.compaction.equals(schemaCompactionParams))
+        if (metadata.params.compaction.equals(schemaCompactionParams) &&
+            Arrays.equals(locations, cfs.getDirectories().getWriteableLocations())) // any drives broken?
             return;
         reload(metadata);
     }
@@ -178,6 +225,11 @@ public class CompactionStrategyManager implements INotificationConsumer
     public synchronized void reload(CFMetaData metadata)
     {
         boolean disabledWithJMX = !enabled && shouldBeEnabled();
+        if (!metadata.params.compaction.equals(schemaCompactionParams))
+            logger.trace("Recreating compaction strategy - compaction parameters changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+        else if (!Arrays.equals(locations, cfs.getDirectories().getWriteableLocations()))
+            logger.trace("Recreating compaction strategy - writeable locations changed for {}.{}", cfs.keyspace.getName(), cfs.getTableName());
+
         setStrategy(metadata.params.compaction);
         schemaCompactionParams = metadata.params.compaction;
 
@@ -197,11 +249,13 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public int getUnleveledSSTables()
     {
-        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+        if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
         {
             int count = 0;
-            count += ((LeveledCompactionStrategy)repaired).getLevelSize(0);
-            count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+            for (AbstractCompactionStrategy strategy : repaired)
+                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
+            for (AbstractCompactionStrategy strategy : unrepaired)
+                count += ((LeveledCompactionStrategy)strategy).getLevelSize(0);
             return count;
         }
         return 0;
@@ -209,13 +263,19 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public synchronized int[] getSSTableCountPerLevel()
     {
-        if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy)
+        if (repaired.get(0) instanceof LeveledCompactionStrategy && unrepaired.get(0) instanceof LeveledCompactionStrategy)
         {
             int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT];
-            int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
-            res = sumArrays(res, repairedCountPerLevel);
-            int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
-            res = sumArrays(res, unrepairedCountPerLevel);
+            for (AbstractCompactionStrategy strategy : repaired)
+            {
+                int[] repairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                res = sumArrays(res, repairedCountPerLevel);
+            }
+            for (AbstractCompactionStrategy strategy : unrepaired)
+            {
+                int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) strategy).getAllLevelSize();
+                res = sumArrays(res, unrepairedCountPerLevel);
+            }
             return res;
         }
         return null;
@@ -238,103 +298,112 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public boolean shouldDefragment()
     {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.shouldDefragment();
+        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+        return repaired.get(0).shouldDefragment();
     }
 
     public Directories getDirectories()
     {
-        assert repaired.getClass().equals(unrepaired.getClass());
-        return repaired.getDirectories();
+        assert repaired.get(0).getClass().equals(unrepaired.get(0).getClass());
+        return repaired.get(0).getDirectories();
     }
 
     public synchronized void handleNotification(INotification notification, Object sender)
     {
+        maybeReload(cfs.metadata);
         if (notification instanceof SSTableAddedNotification)
         {
             SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
             for (SSTableReader sstable : flushedNotification.added)
-            {
-                if (sstable.isRepaired())
-                    repaired.addSSTable(sstable);
-                else
-                    unrepaired.addSSTable(sstable);
-            }
+                getCompactionStrategyFor(sstable).addSSTable(sstable);
         }
         else if (notification instanceof SSTableListChangedNotification)
         {
+            // a bit of gymnastics to be able to replace sstables in compaction strategies
+            // we use this to know that a compaction finished and where to start the next compaction in LCS
             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
-            Set<SSTableReader> repairedRemoved = new HashSet<>();
-            Set<SSTableReader> repairedAdded = new HashSet<>();
-            Set<SSTableReader> unrepairedRemoved = new HashSet<>();
-            Set<SSTableReader> unrepairedAdded = new HashSet<>();
+
+            Directories.DataDirectory [] locations = cfs.getDirectories().getWriteableLocations();
+            int locationSize = cfs.getPartitioner().splitter().isPresent() ? locations.length : 1;
+
+            List<Set<SSTableReader>> repairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> repairedAdded = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedRemoved = new ArrayList<>(locationSize);
+            List<Set<SSTableReader>> unrepairedAdded = new ArrayList<>(locationSize);
+
+            for (int i = 0; i < locationSize; i++)
+            {
+                repairedRemoved.add(new HashSet<>());
+                repairedAdded.add(new HashSet<>());
+                unrepairedRemoved.add(new HashSet<>());
+                unrepairedAdded.add(new HashSet<>());
+            }
 
             for (SSTableReader sstable : listChangedNotification.removed)
             {
+                int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
-                    repairedRemoved.add(sstable);
+                    repairedRemoved.get(i).add(sstable);
                 else
-                    unrepairedRemoved.add(sstable);
+                    unrepairedRemoved.get(i).add(sstable);
             }
             for (SSTableReader sstable : listChangedNotification.added)
             {
+                int i = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
-                    repairedAdded.add(sstable);
+                    repairedAdded.get(i).add(sstable);
                 else
-                    unrepairedAdded.add(sstable);
-            }
-            if (!repairedRemoved.isEmpty())
-            {
-                repaired.replaceSSTables(repairedRemoved, repairedAdded);
-            }
-            else
-            {
-                for (SSTableReader sstable : repairedAdded)
-                    repaired.addSSTable(sstable);
+                    unrepairedAdded.get(i).add(sstable);
             }
 
-            if (!unrepairedRemoved.isEmpty())
+            for (int i = 0; i < locationSize; i++)
             {
-                unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded);
-            }
-            else
-            {
-                for (SSTableReader sstable : unrepairedAdded)
-                    unrepaired.addSSTable(sstable);
+                if (!repairedRemoved.get(i).isEmpty())
+                    repaired.get(i).replaceSSTables(repairedRemoved.get(i), repairedAdded.get(i));
+                else
+                {
+                    for (SSTableReader sstable : repairedAdded.get(i))
+                        repaired.get(i).addSSTable(sstable);
+                }
+                if (!unrepairedRemoved.get(i).isEmpty())
+                    unrepaired.get(i).replaceSSTables(unrepairedRemoved.get(i), unrepairedAdded.get(i));
+                else
+                {
+                    for (SSTableReader sstable : unrepairedAdded.get(i))
+                        unrepaired.get(i).addSSTable(sstable);
+                }
             }
         }
         else if (notification instanceof SSTableRepairStatusChanged)
         {
             for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
             {
+                int index = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
                 if (sstable.isRepaired())
                 {
-                    unrepaired.removeSSTable(sstable);
-                    repaired.addSSTable(sstable);
+                    unrepaired.get(index).removeSSTable(sstable);
+                    repaired.get(index).addSSTable(sstable);
                 }
                 else
                 {
-                    repaired.removeSSTable(sstable);
-                    unrepaired.addSSTable(sstable);
+                    repaired.get(index).removeSSTable(sstable);
+                    unrepaired.get(index).addSSTable(sstable);
                 }
             }
         }
         else if (notification instanceof SSTableDeletingNotification)
         {
-            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
-            if (sstable.isRepaired())
-                repaired.removeSSTable(sstable);
-            else
-                unrepaired.removeSSTable(sstable);
+            SSTableReader sstable = ((SSTableDeletingNotification) notification).deleting;
+            getCompactionStrategyFor(sstable).removeSSTable(sstable);
         }
     }
 
     public void enable()
     {
         if (repaired != null)
-            repaired.enable();
+            repaired.forEach(AbstractCompactionStrategy::enable);
         if (unrepaired != null)
-            unrepaired.enable();
+            unrepaired.forEach(AbstractCompactionStrategy::enable);
         // enable this last to make sure the strategies are ready to get calls.
         enabled = true;
     }
@@ -344,47 +413,64 @@ public class CompactionStrategyManager implements INotificationConsumer
         // disable this first avoid asking disabled strategies for compaction tasks
         enabled = false;
         if (repaired != null)
-            repaired.disable();
+            repaired.forEach(AbstractCompactionStrategy::disable);
         if (unrepaired != null)
-            unrepaired.disable();
+            unrepaired.forEach(AbstractCompactionStrategy::disable);
     }
 
     /**
-     * Create ISSTableScanner from the given sstables
+     * Create ISSTableScanners from the given sstables
      *
      * Delegates the call to the compaction strategies to allow LCS to create a scanner
      * @param sstables
-     * @param range
+     * @param ranges
      * @return
      */
     @SuppressWarnings("resource")
     public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
-        List<SSTableReader> repairedSSTables = new ArrayList<>();
-        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+        assert repaired.size() == unrepaired.size();
+        List<Set<SSTableReader>> repairedSSTables = new ArrayList<>();
+        List<Set<SSTableReader>> unrepairedSSTables = new ArrayList<>();
+
+        for (int i = 0; i < repaired.size(); i++)
+        {
+            repairedSSTables.add(new HashSet<>());
+            unrepairedSSTables.add(new HashSet<>());
+        }
+
         for (SSTableReader sstable : sstables)
         {
             if (sstable.isRepaired())
-                repairedSSTables.add(sstable);
+                repairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
             else
-                unrepairedSSTables.add(sstable);
+                unrepairedSSTables.get(getCompactionStrategyIndex(cfs, getDirectories(), sstable)).add(sstable);
         }
 
-        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
+        List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 
         for (Range<Token> range : ranges)
         {
-            AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
-            AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+            List<ISSTableScanner> repairedScanners = new ArrayList<>();
+            List<ISSTableScanner> unrepairedScanners = new ArrayList<>();
 
-            for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+            for (int i = 0; i < repairedSSTables.size(); i++)
+            {
+                if (!repairedSSTables.get(i).isEmpty())
+                    repairedScanners.addAll(repaired.get(i).getScanners(repairedSSTables.get(i), range).scanners);
+            }
+            for (int i = 0; i < unrepairedSSTables.size(); i++)
+            {
+                if (!unrepairedSSTables.get(i).isEmpty())
+                    scanners.addAll(unrepaired.get(i).getScanners(unrepairedSSTables.get(i), range).scanners);
+            }
+            for (ISSTableScanner scanner : Iterables.concat(repairedScanners, unrepairedScanners))
             {
                 if (!scanners.add(scanner))
                     scanner.close();
             }
         }
-
-        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
+        return new AbstractCompactionStrategy.ScannerList(scanners);
     }
 
     public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
@@ -394,21 +480,44 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)
     {
-        return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup);
+        Map<Integer, List<SSTableReader>> groups = sstablesToGroup.stream().collect(Collectors.groupingBy((s) -> getCompactionStrategyIndex(cfs, getDirectories(), s)));
+        Collection<Collection<SSTableReader>> anticompactionGroups = new ArrayList<>();
+
+        for (Map.Entry<Integer, List<SSTableReader>> group : groups.entrySet())
+            anticompactionGroups.addAll(unrepaired.get(group.getKey()).groupSSTablesForAntiCompaction(group.getValue()));
+        return anticompactionGroups;
     }
 
     public long getMaxSSTableBytes()
     {
-        return unrepaired.getMaxSSTableBytes();
+        return unrepaired.get(0).getMaxSSTableBytes();
     }
 
     public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes)
     {
+        maybeReload(cfs.metadata);
+        validateForCompaction(txn.originals());
         return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes);
     }
 
+    private void validateForCompaction(Iterable<SSTableReader> input)
+    {
+        SSTableReader firstSSTable = Iterables.getFirst(input, null);
+        assert firstSSTable != null;
+        boolean repaired = firstSSTable.isRepaired();
+        int firstIndex = getCompactionStrategyIndex(cfs, getDirectories(), firstSSTable);
+        for (SSTableReader sstable : input)
+        {
+            if (sstable.isRepaired() != repaired)
+                throw new UnsupportedOperationException("You can't mix repaired and unrepaired data in a compaction");
+            if (firstIndex != getCompactionStrategyIndex(cfs, getDirectories(), sstable))
+                throw new UnsupportedOperationException("You can't mix sstables from different directories in a compaction");
+        }
+    }
+
     public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput)
     {
+        maybeReload(cfs.metadata);
         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
         // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
         // sstables are marked the compactions are re-enabled
@@ -419,20 +528,21 @@ public class CompactionStrategyManager implements INotificationConsumer
             {
                 synchronized (CompactionStrategyManager.this)
                 {
-                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput);
-                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput);
-
-                    if (repairedTasks == null && unrepairedTasks == null)
-                        return null;
-
-                    if (repairedTasks == null)
-                        return unrepairedTasks;
-                    if (unrepairedTasks == null)
-                        return repairedTasks;
-
                     List<AbstractCompactionTask> tasks = new ArrayList<>();
-                    tasks.addAll(repairedTasks);
-                    tasks.addAll(unrepairedTasks);
+                    for (AbstractCompactionStrategy strategy : repaired)
+                    {
+                        Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+                        if (task != null)
+                            tasks.addAll(task);
+                    }
+                    for (AbstractCompactionStrategy strategy : unrepaired)
+                    {
+                        Collection<AbstractCompactionTask> task = strategy.getMaximalTask(gcBefore, splitOutput);
+                        if (task != null)
+                            tasks.addAll(task);
+                    }
+                    if (tasks.isEmpty())
+                        return null;
                     return tasks;
                 }
             }
@@ -441,14 +551,18 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
     {
+        maybeReload(cfs.metadata);
+        validateForCompaction(sstables);
         return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore);
     }
 
     public int getEstimatedRemainingTasks()
     {
         int tasks = 0;
-        tasks += repaired.getEstimatedRemainingTasks();
-        tasks += unrepaired.getEstimatedRemainingTasks();
+        for (AbstractCompactionStrategy strategy : repaired)
+            tasks += strategy.getEstimatedRemainingTasks();
+        for (AbstractCompactionStrategy strategy : unrepaired)
+            tasks += strategy.getEstimatedRemainingTasks();
 
         return tasks;
     }
@@ -460,10 +574,10 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     public String getName()
     {
-        return unrepaired.getName();
+        return unrepaired.get(0).getName();
     }
 
-    public List<AbstractCompactionStrategy> getStrategies()
+    public List<List<AbstractCompactionStrategy>> getStrategies()
     {
         return Arrays.asList(repaired, unrepaired);
     }
@@ -481,12 +595,25 @@ public class CompactionStrategyManager implements INotificationConsumer
 
     private void setStrategy(CompactionParams params)
     {
-        if (repaired != null)
-            repaired.shutdown();
-        if (unrepaired != null)
-            unrepaired.shutdown();
-        repaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
-        unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params);
+        repaired.forEach(AbstractCompactionStrategy::shutdown);
+        unrepaired.forEach(AbstractCompactionStrategy::shutdown);
+        repaired.clear();
+        unrepaired.clear();
+
+        if (cfs.getPartitioner().splitter().isPresent())
+        {
+            locations = cfs.getDirectories().getWriteableLocations();
+            for (int i = 0; i < locations.length; i++)
+            {
+                repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+                unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+            }
+        }
+        else
+        {
+            repaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+            unrepaired.add(CFMetaData.createCompactionStrategyInstance(cfs, params));
+        }
         this.params = params;
     }
 
@@ -510,11 +637,11 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
         {
-            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            return unrepaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
         }
         else
         {
-            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
+            return repaired.get(0).createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index be81c80..6b9fe21 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -230,7 +230,7 @@ public class CompactionTask extends AbstractCompactionTask
                                                           LifecycleTransaction transaction,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
+        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals, getLevel());
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 50f9b71..2dc6ee8 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -85,7 +85,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
      */
     private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
     {
-        if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
+        if (sstables.isEmpty())
             return Collections.emptyList();
 
         Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
@@ -212,6 +212,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
     {
         sstables.remove(sstable);
     }
+
     /**
      * A target time span used for bucketing SSTables based on timestamps.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index b7bf83f..c54b751 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -355,7 +355,7 @@ public class LeveledManifest
         Collection<SSTableReader> candidates = getCandidatesFor(0);
         if (candidates.isEmpty())
             return null;
-        return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategyManager().getMaxSSTableBytes());
+        return new CompactionCandidate(candidates, getNextLevel(candidates), maxSSTableSizeInBytes);
     }
 
     private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 20e6df2..84a34c9 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -37,7 +37,8 @@ public enum OperationType
     STREAM("Stream"),
     WRITE("Write"),
     VIEW_BUILD("View build"),
-    INDEX_SUMMARY("Index summary redistribution");
+    INDEX_SUMMARY("Index summary redistribution"),
+    RELOCATE("Relocate sstables to correct disk");
 
     public final String type;
     public final String fileName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 272c2f8..838b0a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -104,11 +104,8 @@ public class Scrubber implements Closeable
 
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
-        // Calculate the expected compacted filesize
-        this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
-        if (destination == null)
-            throw new IOException("disk full");
-
+        int locIndex = CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), sstable);
+        this.destination = cfs.getDirectories().getLocationForDisk(cfs.getDirectories().getWriteableLocations()[locIndex]);
         this.isCommutative = cfs.metadata.isCounter();
 
         boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index f8a8240..e36adf2 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
         logger.trace("Compaction buckets are {}", buckets);
-        updateEstimatedCompactionsByTasks(buckets);
+        estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
         List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
         if (!mostInteresting.isEmpty())
             return mostInteresting;
@@ -282,15 +283,15 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return new ArrayList<List<T>>(buckets.values());
     }
 
-    private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>> tasks)
+    public static int getEstimatedCompactionsByTasks(ColumnFamilyStore cfs, List<List<SSTableReader>> tasks)
     {
         int n = 0;
-        for (List<SSTableReader> bucket: tasks)
+        for (List<SSTableReader> bucket : tasks)
         {
             if (bucket.size() >= cfs.getMinimumCompactionThreshold())
                 n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
         }
-        estimatedRemainingTasks = n;
+        return n;
     }
 
     public long getMaxSSTableBytes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2c63418/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 0b3b7d0..46023ce 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,18 +18,26 @@
 
 package org.apache.cassandra.db.compaction.writers;
 
+import java.io.File;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.concurrent.Transactional;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.service.StorageService;
 
 
 /**
@@ -38,6 +46,8 @@ import org.apache.cassandra.utils.concurrent.Transactional;
  */
 public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
 {
+    protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
+
     protected final ColumnFamilyStore cfs;
     protected final Directories directories;
     protected final Set<SSTableReader> nonExpiredSSTables;
@@ -45,9 +55,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected final long maxAge;
     protected final long minRepairedAt;
 
-    protected final LifecycleTransaction txn;
     protected final SSTableRewriter sstableWriter;
-    private boolean isInitialized = false;
+    protected final LifecycleTransaction txn;
+    private final Directories.DataDirectory[] locations;
+    private final List<PartitionPosition> diskBoundaries;
+    private int locationIndex;
 
     public CompactionAwareWriter(ColumnFamilyStore cfs,
                                  Directories directories,
@@ -59,12 +71,15 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         this.cfs = cfs;
         this.directories = directories;
         this.nonExpiredSSTables = nonExpiredSSTables;
-        this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
-        this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
-        this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
         this.txn = txn;
-        this.sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
 
+        estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
+        maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
+        sstableWriter = SSTableRewriter.constructKeepingOriginals(txn, keepOriginals, maxAge, offline);
+        minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+        locations = cfs.getDirectories().getWriteableLocations();
+        diskBoundaries = StorageService.getDiskBoundaries(cfs);
+        locationIndex = -1;
     }
 
     @Override
@@ -96,6 +111,8 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         return sstableWriter.finished();
     }
 
+    public abstract List<SSTableReader> finish(long repairedAt);
+
     /**
      * estimated number of keys we should write
      */
@@ -104,6 +121,11 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         return estimatedTotalKeys;
     }
 
+    /**
+     * Writes a partition in an implementation specific way
+     * @param partition the partition to append
+     * @return true if the partition was written, false otherwise
+     */
     public final boolean append(UnfilteredRowIterator partition)
     {
         maybeSwitchWriter(partition.partitionKey());
@@ -125,9 +147,26 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
      */
     protected void maybeSwitchWriter(DecoratedKey key)
     {
-        if (!isInitialized)
-            switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
-        isInitialized = true;
+        if (diskBoundaries == null)
+        {
+            if (locationIndex < 0)
+            {
+                Directories.DataDirectory defaultLocation = getWriteDirectory(nonExpiredSSTables, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, OperationType.UNKNOWN));
+                switchCompactionLocation(defaultLocation);
+                locationIndex = 0;
+            }
+            return;
+        }
+
+        if (locationIndex > -1 && key.compareTo(diskBoundaries.get(locationIndex)) < 0)
+            return;
+
+        int prevIdx = locationIndex;
+        while (locationIndex == -1 || key.compareTo(diskBoundaries.get(locationIndex)) > 0)
+            locationIndex++;
+        if (prevIdx >= 0)
+            logger.debug("Switching write location from {} to {}", locations[prevIdx], locations[locationIndex]);
+        switchCompactionLocation(locations[locationIndex]);
     }
 
     /**
@@ -148,13 +187,37 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     /**
      * Return a directory where we can expect expectedWriteSize to fit.
+     *
+     * @param sstables the sstables to compact
+     * @return
      */
-    public Directories.DataDirectory getWriteDirectory(long expectedWriteSize)
+    public Directories.DataDirectory getWriteDirectory(Iterable<SSTableReader> sstables, long estimatedWriteSize)
     {
-        Directories.DataDirectory directory = getDirectories().getWriteableLocation(expectedWriteSize);
-        if (directory == null)
-            throw new RuntimeException("Insufficient disk space to write " + expectedWriteSize + " bytes");
+        File directory = null;
+        for (SSTableReader sstable : sstables)
+        {
+            if (directory == null)
+                directory = sstable.descriptor.directory;
+            if (!directory.equals(sstable.descriptor.directory))
+                logger.trace("All sstables not from the same disk - putting results in {}", directory);
+        }
+        Directories.DataDirectory d = getDirectories().getDataDirectoryForFile(directory);
+        if (d != null)
+        {
+            if (d.getAvailableSpace() < estimatedWriteSize)
+                throw new RuntimeException(String.format("Not enough space to write %d bytes to %s (%d bytes available)", estimatedWriteSize, d.location, d.getAvailableSpace()));
+            logger.trace("putting compaction results in {}", directory);
+            return d;
+        }
+        d = getDirectories().getWriteableLocation(estimatedWriteSize);
+        if (d == null)
+            throw new RuntimeException("Not enough disk space to store "+estimatedWriteSize+" bytes");
+        return d;
+    }
 
-        return directory;
+    public CompactionAwareWriter setRepairedAt(long repairedAt)
+    {
+        this.sstableWriter.setRepairedAt(repairedAt);
+        return this;
     }
 }