You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/08/20 20:53:01 UTC

[1/2] cassandra git commit: Give compaction strategies more control over sstable creation

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 0d866456a -> 9ed272773


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
new file mode 100644
index 0000000..2112656
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+
+public class SimpleSSTableMultiWriter implements SSTableMultiWriter
+{
+    private final SSTableWriter writer;
+
+    private SimpleSSTableMultiWriter(SSTableWriter writer)
+    {
+        this.writer = writer;
+    }
+
+    public boolean append(UnfilteredRowIterator partition)
+    {
+        RowIndexEntry indexEntry = writer.append(partition);
+        return indexEntry != null;
+    }
+
+    public Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult)
+    {
+        return Collections.singleton(writer.finish(repairedAt, maxDataAge, openResult));
+    }
+
+    public Collection<SSTableReader> finish(boolean openResult)
+    {
+        return Collections.singleton(writer.finish(openResult));
+    }
+
+    public Collection<SSTableReader> finished()
+    {
+        return Collections.singleton(writer.finished());
+    }
+
+    public SSTableMultiWriter setOpenResult(boolean openResult)
+    {
+        writer.setOpenResult(openResult);
+        return this;
+    }
+
+    public String getFilename()
+    {
+        return writer.getFilename();
+    }
+
+    public long getFilePointer()
+    {
+        return writer.getFilePointer();
+    }
+
+    public UUID getCfId()
+    {
+        return writer.metadata.cfId;
+    }
+
+    public Throwable commit(Throwable accumulate)
+    {
+        return writer.commit(accumulate);
+    }
+
+    public Throwable abort(Throwable accumulate)
+    {
+        return writer.abort(accumulate);
+    }
+
+    public void prepareToCommit()
+    {
+        writer.prepareToCommit();
+    }
+
+    public void close() throws Exception
+    {
+        writer.close();
+    }
+
+    public static SSTableMultiWriter create(Descriptor descriptor,
+                                            long keyCount,
+                                            long repairedAt,
+                                            CFMetaData cfm,
+                                            MetadataCollector metadataCollector,
+                                            SerializationHeader header,
+                                            LifecycleTransaction txn)
+    {
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn);
+        return new SimpleSSTableMultiWriter(writer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
index 15230ea..56d6130 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableAddedNotification.java
@@ -21,8 +21,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 
 public class SSTableAddedNotification implements INotification
 {
-    public final SSTableReader added;
-    public SSTableAddedNotification(SSTableReader added)
+    public final Iterable<SSTableReader> added;
+    public SSTableAddedNotification(Iterable<SSTableReader> added)
     {
         this.added = added;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index a098786..d4b7283 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -36,12 +36,11 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.messages.FileMessageHeader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.BytesReadTracker;
@@ -85,7 +84,7 @@ public class StreamReader
      * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails.
      */
     @SuppressWarnings("resource")
-    public SSTableWriter read(ReadableByteChannel channel) throws IOException
+    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
     {
         logger.debug("reading file from {}, repairedAt = {}, level = {}", session.peer, repairedAt, sstableLevel);
         long totalSize = totalSize();
@@ -98,7 +97,7 @@ public class StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+        SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
         DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
         BytesReadTracker in = new BytesReadTracker(dis);
@@ -115,7 +114,8 @@ public class StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            SSTableMultiWriter.abortOrDie(writer);
+
             drain(dis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;
@@ -124,14 +124,16 @@ public class StreamReader
         }
     }
 
-    protected SSTableWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
+    protected SSTableMultiWriter createWriter(ColumnFamilyStore cfs, long totalSize, long repairedAt, SSTableFormat.Type format) throws IOException
     {
-        Directories.DataDirectory localDir = cfs.directories.getWriteableLocation(totalSize);
+        Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize);
         if (localDir == null)
             throw new IOException("Insufficient disk space to store " + totalSize + " bytes");
-        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.directories.getLocationForDisk(localDir), format));
 
-        return SSTableWriter.create(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
+        desc = Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir), format));
+
+
+        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, sstableLevel, header.toHeader(cfs.metadata), session.getTransaction(cfId));
     }
 
     protected void drain(InputStream dis, long bytesRead) throws IOException
@@ -161,7 +163,7 @@ public class StreamReader
         return size;
     }
 
-    protected void writePartition(StreamDeserializer deserializer, SSTableWriter writer, ColumnFamilyStore cfs) throws IOException
+    protected void writePartition(StreamDeserializer deserializer, SSTableMultiWriter writer, ColumnFamilyStore cfs) throws IOException
     {
         DecoratedKey key = deserializer.newPartition();
         writer.append(deserializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 7311069..2bcbbc1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -68,7 +69,7 @@ public class StreamReceiveTask extends StreamTask
     private boolean done = false;
 
     //  holds references to SSTables received
-    protected Collection<SSTableWriter> sstables;
+    protected Collection<SSTableMultiWriter> sstables;
 
     public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
     {
@@ -86,12 +87,12 @@ public class StreamReceiveTask extends StreamTask
      *
      * @param sstable SSTable file received.
      */
-    public synchronized void received(SSTableWriter sstable)
+    public synchronized void received(SSTableMultiWriter sstable)
     {
         if (done)
             return;
 
-        assert cfId.equals(sstable.metadata.cfId);
+        assert cfId.equals(sstable.getCfId());
 
         sstables.add(sstable);
         if (sstables.size() == totalFiles)
@@ -126,8 +127,8 @@ public class StreamReceiveTask extends StreamTask
             if (kscf == null)
             {
                 // schema was dropped during streaming
-                for (SSTableWriter writer : task.sstables)
-                    writer.abort();
+                task.sstables.forEach(SSTableMultiWriter::abortOrDie);
+
                 task.sstables.clear();
                 task.txn.abort();
                 return;
@@ -138,11 +139,11 @@ public class StreamReceiveTask extends StreamTask
             try
             {
                 List<SSTableReader> readers = new ArrayList<>();
-                for (SSTableWriter writer : task.sstables)
+                for (SSTableMultiWriter writer : task.sstables)
                 {
-                    SSTableReader reader = writer.finish(true);
-                    readers.add(reader);
-                    task.txn.update(reader, false);
+                    Collection<SSTableReader> newReaders = writer.finish(true);
+                    readers.addAll(newReaders);
+                    task.txn.update(newReaders, false);
                 }
 
                 task.sstables.clear();
@@ -211,8 +212,7 @@ public class StreamReceiveTask extends StreamTask
             return;
 
         done = true;
-        for (SSTableWriter writer : sstables)
-            writer.abort();
+        sstables.forEach(SSTableMultiWriter::abortOrDie);
         txn.abort();
         sstables.clear();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 205291b..f702e24 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@ -25,7 +25,7 @@ import java.nio.channels.ReadableByteChannel;
 
 import com.google.common.base.Throwables;
 
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +62,7 @@ public class CompressedStreamReader extends StreamReader
      */
     @Override
     @SuppressWarnings("resource")
-    public SSTableWriter read(ReadableByteChannel channel) throws IOException
+    public SSTableMultiWriter read(ReadableByteChannel channel) throws IOException
     {
         logger.debug("reading file from {}, repairedAt = {}", session.peer, repairedAt);
         long totalSize = totalSize();
@@ -75,7 +75,7 @@ public class CompressedStreamReader extends StreamReader
         }
         ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
 
-        SSTableWriter writer = createWriter(cfs, totalSize, repairedAt, format);
+        SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, format);
 
         CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.compressedChecksumType());
         BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
@@ -102,7 +102,7 @@ public class CompressedStreamReader extends StreamReader
         }
         catch (Throwable e)
         {
-            writer.abort();
+            SSTableMultiWriter.abortOrDie(writer);
             drain(cis, in.getBytesRead());
             if (e instanceof IOException)
                 throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
index bce9691..19f9e12 100644
--- a/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
@@ -17,12 +17,11 @@
  */
 package org.apache.cassandra.streaming.messages;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@ -80,9 +79,9 @@ public class IncomingFileMessage extends StreamMessage
     };
 
     public FileMessageHeader header;
-    public SSTableWriter sstable;
+    public SSTableMultiWriter sstable;
 
-    public IncomingFileMessage(SSTableWriter sstable, FileMessageHeader header)
+    public IncomingFileMessage(SSTableMultiWriter sstable, FileMessageHeader header)
     {
         super(Type.FILE);
         this.header = header;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
index c8587d8..701bbc3 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExpiredBlockers.java
@@ -29,7 +29,6 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -69,7 +68,7 @@ public class SSTableExpiredBlockers
 
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
-        Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+        Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
         Set<SSTableReader> sstables = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
index cb3cc5c..4f4e904 100644
--- a/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
+++ b/src/java/org/apache/cassandra/tools/SSTableLevelResetter.java
@@ -76,7 +76,7 @@ public class SSTableLevelResetter
             Keyspace keyspace = Keyspace.openWithoutSSTables(keyspaceName);
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnfamily);
             boolean foundSSTable = false;
-            for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
+            for (Map.Entry<Descriptor, Set<Component>> sstable : cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().entrySet())
             {
                 if (sstable.getValue().contains(Component.STATS))
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
index 95f516a..6554bd0 100644
--- a/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
+++ b/src/java/org/apache/cassandra/tools/SSTableOfflineRelevel.java
@@ -30,17 +30,14 @@ import java.util.Set;
 
 import com.google.common.base.Throwables;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Create a decent leveling for the given keyspace/column family
@@ -95,7 +92,7 @@ public class SSTableOfflineRelevel
 
         Keyspace ks = Keyspace.openWithoutSSTables(keyspace);
         ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnfamily);
-        Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+        Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
         Set<SSTableReader> sstables = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstable : lister.list().entrySet())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index f64b8d9..f3a1a35 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -84,7 +84,7 @@ public class StandaloneScrubber
             String snapshotName = "pre-scrub-" + System.currentTimeMillis();
 
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-            Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
 
             List<SSTableReader> sstables = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index 88e34b7..cf94c99 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -63,7 +63,7 @@ public class StandaloneUpgrader
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cf);
 
             OutputHandler handler = new OutputHandler.SystemOutput(false, options.debug);
-            Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW);
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW);
             if (options.snapshot != null)
                 lister.onlyBackups(true).snapshots(options.snapshot);
             else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
index 0b17e39..3412329 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneVerifier.java
@@ -18,10 +18,6 @@
  */
 package org.apache.cassandra.tools;
 
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -35,7 +31,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.commons.cli.*;
 
-import java.io.File;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -69,7 +64,7 @@ public class StandaloneVerifier
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(options.cfName);
 
             OutputHandler handler = new OutputHandler.SystemOutput(options.verbose, options.debug);
-            Directories.SSTableLister lister = cfs.directories.sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
+            Directories.SSTableLister lister = cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).skipTemporary(true);
 
             boolean extended = options.extended;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 7db978e..d684e11 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -114,9 +114,9 @@ public class LongCompactionsTest
                     builder.newRow(String.valueOf(i)).add("val", String.valueOf(i));
                 rows.put(key, builder.build());
             }
-            SSTableReader sstable = SSTableUtils.prepare().write(rows);
-            sstables.add(sstable);
-            store.addSSTable(sstable);
+            Collection<SSTableReader> readers = SSTableUtils.prepare().write(rows);
+            sstables.addAll(readers);
+            store.addSSTables(readers);
         }
 
         // give garbage collection a bit of time to catch up

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index 249dd8d..6b50e49 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -86,7 +86,7 @@ public class MockSchema
 
     public static SSTableReader sstable(int generation, int size, boolean keepRef, ColumnFamilyStore cfs)
     {
-        Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(),
+        Descriptor descriptor = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(),
                                                cfs.keyspace.getName(),
                                                cfs.getColumnFamilyName(),
                                                generation);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 78f9e3e..6840e2b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -329,7 +329,7 @@ public class ColumnFamilyStoreTest
         assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
         assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
 
-        ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories);
+        ColumnFamilyStore.clearEphemeralSnapshots(cfs.getDirectories());
 
         snapshotDetails = cfs.getSnapshotDetails();
         assertEquals(1, snapshotDetails.size());
@@ -350,7 +350,7 @@ public class ColumnFamilyStoreTest
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
+            Descriptor existing = new Descriptor(cfs.getDirectories().getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
             Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 8889488..52b2aa8 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -324,12 +324,12 @@ public class ScrubTest
             String filename = cfs.getSSTablePath(tempDataDir);
             Descriptor desc = Descriptor.fromFilename(filename);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc,
-                                                             keys.size(),
-                                                             0L,
-                                                             0,
-                                                             SerializationHeader.make(cfs.metadata,
-                                                                                      Collections.emptyList())))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc,
+                                                                   keys.size(),
+                                                                   0L,
+                                                                   0,
+                                                                   SerializationHeader.make(cfs.metadata,
+                                                                                            Collections.emptyList())))
             {
 
                 for (String k : keys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 6dc5f53..a3167f9 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.UpdateBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -162,10 +163,10 @@ public class AntiCompactionTest
 
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
-        File dir = cfs.directories.getDirectoryForNewSSTables();
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         String filename = cfs.getSSTablePath(dir);
 
-        try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {
@@ -175,7 +176,10 @@ public class AntiCompactionTest
                 writer.append(builder.build().unfilteredIterator());
 
             }
-            return writer.finish(true);
+            Collection<SSTableReader> sstables = writer.finish(true);
+            assertNotNull(sstables);
+            assertEquals(1, sstables.size());
+            return sstables.iterator().next();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 1b94a6b..68936f5 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -75,7 +75,7 @@ public class CompactionAwareWriterTest extends CQLTester
         populate(rowCount);
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals());
+        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals());
         int rows = compact(cfs, txn, writer);
         assertEquals(1, cfs.getLiveSSTables().size());
         assertEquals(rowCount, rows);
@@ -94,7 +94,7 @@ public class CompactionAwareWriterTest extends CQLTester
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/10;
-        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0);
+        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize, 0);
         int rows = compact(cfs, txn, writer);
         assertEquals(10, cfs.getLiveSSTables().size());
         assertEquals(rowCount, rows);
@@ -111,7 +111,7 @@ public class CompactionAwareWriterTest extends CQLTester
         populate(rowCount);
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0);
+        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), 0);
         int rows = compact(cfs, txn, writer);
         long expectedSize = beforeSize / 2;
         List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getLiveSSTables());
@@ -147,7 +147,7 @@ public class CompactionAwareWriterTest extends CQLTester
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getLiveSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/targetSSTableCount;
-        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize);
+        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, cfs.getDirectories(), txn, txn.originals(), sstableSize);
         int rows = compact(cfs, txn, writer);
         assertEquals(targetSSTableCount, cfs.getLiveSSTables().size());
         int [] levelCounts = new int[5];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 9d5e5fc..8035ac4 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -346,7 +346,7 @@ public class LeveledCompactionStrategyTest
         assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
 
         unrepaired.removeSSTable(sstable2);
-        strategy.handleNotification(new SSTableAddedNotification(sstable2), this);
+        strategy.handleNotification(new SSTableAddedNotification(Collections.singleton(sstable2)), this);
         assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
         assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 309e35a..9eff1b1 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -34,7 +34,6 @@ import junit.framework.Assert;
 import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
@@ -50,7 +49,6 @@ import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
@@ -134,11 +132,11 @@ public class RealTransactionsTest extends SchemaLoader
     {
         cfs.truncateBlocking();
 
-        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
-        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+        String schema = "CREATE TABLE \"%s\".\"%s\" (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+        String query = "INSERT INTO \"%s\".\"%s\" (key, name, val) VALUES (?, ?, ?)";
 
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
-                                                       .inDirectory(cfs.directories.getDirectoryForNewSSTables())
+                                                       .inDirectory(cfs.getDirectories().getDirectoryForNewSSTables())
                                                        .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
                                                        .using(String.format(query, cfs.keyspace.getName(), cfs.name))
                                                        .build())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index 3a943c4..9bcd9e7 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -30,6 +30,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -177,7 +178,7 @@ public class TrackerTest
             Assert.assertTrue(reader.isKeyCacheSetup());
 
         Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
-        Assert.assertEquals(3, listener.senders.size());
+        Assert.assertEquals(1, listener.senders.size());
         Assert.assertEquals(tracker, listener.senders.get(0));
         Assert.assertTrue(listener.received.get(0) instanceof SSTableAddedNotification);
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -296,10 +297,10 @@ public class TrackerTest
         Assert.assertTrue(tracker.getView().flushingMemtables.contains(prev2));
 
         SSTableReader reader = MockSchema.sstable(0, 10, false, cfs);
-        tracker.replaceFlushed(prev2, reader);
+        tracker.replaceFlushed(prev2, Collections.singleton(reader));
         Assert.assertEquals(1, tracker.getView().sstables.size());
         Assert.assertEquals(1, listener.received.size());
-        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
         Assert.assertTrue(reader.isKeyCacheSetup());
         Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
@@ -313,12 +314,12 @@ public class TrackerTest
         tracker.markFlushing(prev1);
         reader = MockSchema.sstable(0, 10, true, cfs);
         cfs.invalidate(false);
-        tracker.replaceFlushed(prev1, reader);
+        tracker.replaceFlushed(prev1, Collections.singleton(reader));
         Assert.assertEquals(0, tracker.getView().sstables.size());
         Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
         Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
         Assert.assertEquals(3, listener.received.size());
-        Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertEquals(singleton(reader), ((SSTableAddedNotification) listener.received.get(0)).added);
         Assert.assertTrue(listener.received.get(1) instanceof  SSTableDeletingNotification);
         Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
@@ -332,8 +333,8 @@ public class TrackerTest
         Tracker tracker = new Tracker(null, false);
         MockListener listener = new MockListener(false);
         tracker.subscribe(listener);
-        tracker.notifyAdded(r1);
-        Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+        tracker.notifyAdded(singleton(r1));
+        Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
         tracker.notifyDeleting(r1);
         Assert.assertEquals(r1, ((SSTableDeletingNotification) listener.received.get(0)).deleting);
@@ -353,8 +354,8 @@ public class TrackerTest
         MockListener failListener = new MockListener(true);
         tracker.subscribe(failListener);
         tracker.subscribe(listener);
-        Assert.assertNotNull(tracker.notifyAdded(r1, null));
-        Assert.assertEquals(r1, ((SSTableAddedNotification) listener.received.get(0)).added);
+        Assert.assertNotNull(tracker.notifyAdded(singleton(r1), null));
+        Assert.assertEquals(singleton(r1), ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
         Assert.assertNotNull(tracker.notifySSTablesChanged(singleton(r1), singleton(r2), OperationType.COMPACTION, null));
         Assert.assertEquals(singleton(r1), ((SSTableListChangedNotification) listener.received.get(0)).removed);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 40afa54..523c203 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -195,7 +195,7 @@ public class ViewTest
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());
 
         SSTableReader sstable = MockSchema.sstable(1, cfs);
-        cur = View.replaceFlushed(memtable1, sstable).apply(cur);
+        cur = View.replaceFlushed(memtable1, Collections.singleton(sstable)).apply(cur);
         Assert.assertEquals(0, cur.flushingMemtables.size());
         Assert.assertEquals(1, cur.liveMemtables.size());
         Assert.assertEquals(memtable3, cur.getCurrentMemtable());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 856ef7c..e1ab48f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -64,12 +64,12 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         private TestableBTW()
         {
-            this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+            this(cfs.getSSTablePath(cfs.getDirectories().getDirectoryForNewSSTables()));
         }
 
         private TestableBTW(String file)
         {
-            this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+            this(file, SSTableTxnWriter.create(cfs, file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
         }
 
         private TestableBTW(String file, SSTableTxnWriter sw)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index ceeb369..a9165f7 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
 
@@ -44,6 +45,7 @@ public class CQLSSTableWriterClientTest
     public void setUp()
     {
         this.testDirectory = Files.createTempDir();
+        Keyspace.setInitialized();
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index d9516cb..1c61f51 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -246,7 +246,7 @@ public class SSTableRewriterTest extends SchemaLoader
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
         truncate(cfs);
 
-        File dir = cfs.directories.getDirectoryForNewSSTables();
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata);
         try (SSTableWriter writer = getWriter(cfs, dir, txn))
         {
@@ -941,10 +941,10 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> result = new LinkedHashSet<>();
         for (int f = 0 ; f < fileCount ; f++)
         {
-            File dir = cfs.directories.getDirectoryForNewSSTables();
+            File dir = cfs.getDirectories().getDirectoryForNewSSTables();
             String filename = cfs.getSSTablePath(dir);
 
-            try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, filename, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount;
                 for ( ; i < end ; i++)
@@ -955,7 +955,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
                     writer.append(builder.build().unfilteredIterator());
                 }
-                result.add(writer.finish(true));
+                result.addAll(writer.finish(true));
             }
         }
         return result;
@@ -972,7 +972,7 @@ public class SSTableRewriterTest extends SchemaLoader
             liveDescriptors.add(sstable.descriptor.generation);
             spaceUsed += sstable.bytesOnDisk();
         }
-        for (File dir : cfs.directories.getCFDirectories())
+        for (File dir : cfs.getDirectories().getCFDirectories())
         {
             for (File f : dir.listFiles())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 6de5bb9..89c0d61 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
@@ -164,7 +165,7 @@ public class SSTableUtils
             return this;
         }
 
-        public SSTableReader write(Set<String> keys) throws IOException
+        public Collection<SSTableReader> write(Set<String> keys) throws IOException
         {
             Map<String, PartitionUpdate> map = new HashMap<>();
             for (String key : keys)
@@ -176,7 +177,7 @@ public class SSTableUtils
             return write(map);
         }
 
-        public SSTableReader write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
+        public Collection<SSTableReader> write(SortedMap<DecoratedKey, PartitionUpdate> sorted) throws IOException
         {
             final Iterator<Map.Entry<DecoratedKey, PartitionUpdate>> iter = sorted.entrySet().iterator();
             return write(sorted.size(), new Appender()
@@ -192,7 +193,7 @@ public class SSTableUtils
             });
         }
 
-        public SSTableReader write(Map<String, PartitionUpdate> entries) throws IOException
+        public Collection<SSTableReader> write(Map<String, PartitionUpdate> entries) throws IOException
         {
             SortedMap<DecoratedKey, PartitionUpdate> sorted = new TreeMap<>();
             for (Map.Entry<String, PartitionUpdate> entry : entries.entrySet())
@@ -201,18 +202,22 @@ public class SSTableUtils
             return write(sorted);
         }
 
-        public SSTableReader write(int expectedSize, Appender appender) throws IOException
+        public Collection<SSTableReader> write(int expectedSize, Appender appender) throws IOException
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation) : new File(dest.filenameFor(Component.DATA));
-            SerializationHeader header = SerializationHeader.make(Schema.instance.getCFMetaData(ksname, cfname), Collections.EMPTY_LIST);
-            SSTableTxnWriter writer = SSTableTxnWriter.create(datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
+            CFMetaData cfm = Schema.instance.getCFMetaData(ksname, cfname);
+            ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.cfId);
+            SerializationHeader header = SerializationHeader.make(cfm, Collections.EMPTY_LIST);
+            SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, datafile.getAbsolutePath(), expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
             while (appender.append(writer)) { /* pass */ }
-            SSTableReader reader = writer.finish(true);
+            Collection<SSTableReader> readers = writer.finish(true);
+
             // mark all components for removal
             if (cleanup)
-                for (Component component : reader.components)
-                    new File(reader.descriptor.filenameFor(component)).deleteOnExit();
-            return reader;
+                for (SSTableReader reader: readers)
+                    for (Component component : reader.components)
+                        new File(reader.descriptor.filenameFor(component)).deleteOnExit();
+            return readers;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index c10865a..c3bcf1a 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -204,7 +204,7 @@ public class DefsTest
         ColumnFamilyStore store = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assertNotNull(store);
         store.forceBlockingFlush();
-        assertTrue(store.directories.sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
+        assertTrue(store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().size() > 0);
 
         MigrationManager.announceColumnFamilyDrop(ks.name, cfm.cfName);
 
@@ -226,7 +226,7 @@ public class DefsTest
 
         // verify that the files are gone.
         Supplier<Object> lambda = () -> {
-            for (File file : store.directories.sstableLister(Directories.OnTxnErr.THROW).listFiles())
+            for (File file : store.getDirectories().sstableLister(Directories.OnTxnErr.THROW).listFiles())
             {
                 if (file.getPath().endsWith("Data.db") && !new File(file.getPath().replace("Data.db", "Compacted")).exists())
                     return false;
@@ -275,7 +275,7 @@ public class DefsTest
         ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assertNotNull(cfs);
         cfs.forceBlockingFlush();
-        assertTrue(!cfs.directories.sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
+        assertTrue(!cfs.getDirectories().sstableLister(Directories.OnTxnErr.THROW).list().isEmpty());
 
         MigrationManager.announceKeyspaceDrop(ks.name);
 


[2/2] cassandra git commit: Give compaction strategies more control over sstable creation

Posted by ma...@apache.org.
Give compaction strategies more control over sstable creation

Patch by Blake Eggleston; reviewed by marcuse for CASSANDRA-8671


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed27277
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed27277
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed27277

Branch: refs/heads/cassandra-3.0
Commit: 9ed2727739c73d64086d09a86a407a77390f081a
Parents: 0d86645
Author: Blake Eggleston <bd...@gmail.com>
Authored: Thu Aug 6 10:19:55 2015 -0700
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Aug 20 20:47:40 2015 +0200

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  73 +++++++++---
 .../org/apache/cassandra/db/Directories.java    |  42 +++++--
 src/java/org/apache/cassandra/db/Keyspace.java  |   5 +
 src/java/org/apache/cassandra/db/Memtable.java  |  32 +++--
 .../compaction/AbstractCompactionStrategy.java  |  24 +++-
 .../db/compaction/AbstractCompactionTask.java   |   3 +-
 .../db/compaction/CompactionManager.java        |   6 +-
 .../compaction/CompactionStrategyManager.java   |  40 +++++--
 .../cassandra/db/compaction/CompactionTask.java |  22 ++--
 .../db/compaction/LeveledCompactionTask.java    |   6 +-
 .../db/compaction/SSTableSplitter.java          |   3 +-
 .../cassandra/db/compaction/Scrubber.java       |   3 +-
 .../SizeTieredCompactionStrategy.java           |   4 +-
 .../writers/CompactionAwareWriter.java          |  53 ++++++---
 .../writers/DefaultCompactionWriter.java        |  32 ++---
 .../writers/MajorLeveledCompactionWriter.java   |  46 ++++----
 .../writers/MaxSSTableSizeWriter.java           |  45 ++++---
 .../SplittingSizeTieredCompactionWriter.java    |  52 ++++-----
 .../db/lifecycle/LifecycleTransaction.java      |   9 ++
 .../apache/cassandra/db/lifecycle/Tracker.java  |  34 +++---
 .../org/apache/cassandra/db/lifecycle/View.java |   4 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  11 +-
 .../io/sstable/SSTableMultiWriter.java          |  54 +++++++++
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  43 +++++--
 .../io/sstable/SimpleSSTableMultiWriter.java    | 116 +++++++++++++++++++
 .../notifications/SSTableAddedNotification.java |   4 +-
 .../cassandra/streaming/StreamReader.java       |  22 ++--
 .../cassandra/streaming/StreamReceiveTask.java  |  22 ++--
 .../compress/CompressedStreamReader.java        |   8 +-
 .../streaming/messages/IncomingFileMessage.java |   7 +-
 .../cassandra/tools/SSTableExpiredBlockers.java |   3 +-
 .../cassandra/tools/SSTableLevelResetter.java   |   2 +-
 .../cassandra/tools/SSTableOfflineRelevel.java  |   5 +-
 .../cassandra/tools/StandaloneScrubber.java     |   2 +-
 .../cassandra/tools/StandaloneUpgrader.java     |   2 +-
 .../cassandra/tools/StandaloneVerifier.java     |   7 +-
 .../db/compaction/LongCompactionsTest.java      |   6 +-
 test/unit/org/apache/cassandra/MockSchema.java  |   2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |   4 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |  12 +-
 .../db/compaction/AntiCompactionTest.java       |  10 +-
 .../compaction/CompactionAwareWriterTest.java   |   8 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   8 +-
 .../cassandra/db/lifecycle/TrackerTest.java     |  19 +--
 .../apache/cassandra/db/lifecycle/ViewTest.java |   2 +-
 .../io/sstable/BigTableWriterTest.java          |   4 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |   2 +
 .../io/sstable/SSTableRewriterTest.java         |  10 +-
 .../cassandra/io/sstable/SSTableUtils.java      |  25 ++--
 .../org/apache/cassandra/schema/DefsTest.java   |   6 +-
 51 files changed, 651 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a12de0a..b199c77 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.*;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -75,6 +76,33 @@ import static org.apache.cassandra.utils.Throwables.maybeFail;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
+    // the directories used to load sstables on cfs instantiation
+    private static volatile Directories.DataDirectory[] initialDirectories = Directories.dataDirectories;
+
+    /**
+     * a hook to add additional directories to initialDirectories.
+     * Any additional directories should be added prior to ColumnFamilyStore instantiation on startup
+     */
+    public static synchronized void addInitialDirectories(Directories.DataDirectory[] newDirectories)
+    {
+        assert newDirectories != null;
+
+        Set<Directories.DataDirectory> existing = Sets.newHashSet(initialDirectories);
+
+        List<Directories.DataDirectory> replacementList = Lists.newArrayList(initialDirectories);
+        for (Directories.DataDirectory directory: newDirectories)
+        {
+            if (!existing.contains(directory))
+            {
+                replacementList.add(directory);
+            }
+        }
+
+        Directories.DataDirectory[] replacementArray = new Directories.DataDirectory[replacementList.size()];
+        replacementList.toArray(replacementArray);
+        initialDirectories = replacementArray;
+    }
+
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
     private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
@@ -164,7 +192,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private volatile DefaultInteger maxCompactionThreshold;
     private final CompactionStrategyManager compactionStrategyManager;
 
-    public final Directories directories;
+    private volatile Directories directories;
 
     public final TableMetrics metric;
     public volatile long sampleLatencyNanos;
@@ -189,6 +217,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold());
 
         compactionStrategyManager.maybeReload(metadata);
+        directories = compactionStrategyManager.getDirectories();
 
         scheduleFlush();
 
@@ -330,6 +359,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                               boolean loadSSTables,
                               boolean registerBookkeeping)
     {
+        assert directories != null;
         assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
 
         this.keyspace = keyspace;
@@ -363,6 +393,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // compaction strategy should be created after the CFS has been prepared
         this.compactionStrategyManager = new CompactionStrategyManager(this);
+        this.directories = this.compactionStrategyManager.getDirectories();
 
         if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
         {
@@ -426,6 +457,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    public Directories getDirectories()
+    {
+        return directories;
+    }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+    {
+        MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
+        return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+    }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
+    {
+        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+    }
+
     /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
     public void invalidate()
     {
@@ -499,7 +546,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                          boolean loadSSTables)
     {
         // get the max generation number, to prevent generation conflicts
-        Directories directories = new Directories(metadata);
+        Directories directories = new Directories(metadata, initialDirectories);
         Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
         List<Integer> generations = new ArrayList<Integer>();
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
@@ -633,7 +680,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             currentDescriptors.add(sstable.descriptor);
         Set<SSTableReader> newSSTables = new HashSet<>();
 
-        Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
+        Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
         {
             Descriptor descriptor = entry.getKey();
@@ -1378,9 +1425,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null));
     }
 
-    void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
-        compactionStrategyManager.replaceFlushed(memtable, sstable);
+        compactionStrategyManager.replaceFlushed(memtable, sstables);
     }
 
     public boolean isValid()
@@ -1580,7 +1627,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName)
     {
-        final File manifestFile = directories.getSnapshotManifestFile(snapshotName);
+        final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName);
 
         try
         {
@@ -1602,7 +1649,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private void createEphemeralSnapshotMarkerFile(final String snapshot)
     {
-        final File ephemeralSnapshotMarker = directories.getNewEphemeralSnapshotMarkerFile(snapshot);
+        final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot);
 
         try
         {
@@ -1635,7 +1682,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Map<Integer, SSTableReader> active = new HashMap<>();
         for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL))
             active.put(sstable.descriptor.generation, sstable);
-        Map<Descriptor, Set<Component>> snapshots = directories.sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
+        Map<Descriptor, Set<Component>> snapshots = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list();
         Refs<SSTableReader> refs = new Refs<>();
         try
         {
@@ -1692,12 +1739,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public boolean snapshotExists(String snapshotName)
     {
-        return directories.snapshotExists(snapshotName);
+        return getDirectories().snapshotExists(snapshotName);
     }
 
     public long getSnapshotCreationTime(String snapshotName)
     {
-        return directories.snapshotCreationTime(snapshotName);
+        return getDirectories().snapshotCreationTime(snapshotName);
     }
 
     /**
@@ -1708,7 +1755,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public void clearSnapshot(String snapshotName)
     {
-        List<File> snapshotDirs = directories.getCFDirectories();
+        List<File> snapshotDirs = getDirectories().getCFDirectories();
         Directories.clearSnapshot(snapshotName, snapshotDirs);
     }
     /**
@@ -1718,7 +1765,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public Map<String, Pair<Long,Long>> getSnapshotDetails()
     {
-        return directories.getSnapshotDetails();
+        return getDirectories().getSnapshotDetails();
     }
 
     /**
@@ -2251,7 +2298,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long trueSnapshotsSize()
     {
-        return directories.trueSnapshotsSize();
+        return getDirectories().trueSnapshotsSize();
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index fa01269..90d2085 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -178,30 +178,36 @@ public class Directories
     }
 
     private final CFMetaData metadata;
+    private final DataDirectory[] paths;
     private final File[] dataPaths;
 
+    public Directories(final CFMetaData metadata)
+    {
+        this(metadata, dataDirectories);
+    }
     /**
      * Create Directories of given ColumnFamily.
      * SSTable directories are created under data_directories defined in cassandra.yaml if not exist at this time.
      *
      * @param metadata metadata of ColumnFamily
      */
-    public Directories(final CFMetaData metadata)
+    public Directories(final CFMetaData metadata, DataDirectory[] paths)
     {
         this.metadata = metadata;
+        this.paths = paths;
 
         String cfId = ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(metadata.cfId));
         int idx = metadata.cfName.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
         String cfName = idx >= 0 ? metadata.cfName.substring(0, idx) : metadata.cfName;
         String indexNameWithDot = idx >= 0 ? metadata.cfName.substring(idx) : null;
 
-        this.dataPaths = new File[dataDirectories.length];
+        this.dataPaths = new File[paths.length];
         // If upgraded from version less than 2.1, use existing directories
         String oldSSTableRelativePath = join(metadata.ksName, cfName);
-        for (int i = 0; i < dataDirectories.length; ++i)
+        for (int i = 0; i < paths.length; ++i)
         {
             // check if old SSTable directory exists
-            dataPaths[i] = new File(dataDirectories[i].location, oldSSTableRelativePath);
+            dataPaths[i] = new File(paths[i].location, oldSSTableRelativePath);
         }
         boolean olderDirectoryExists = Iterables.any(Arrays.asList(dataPaths), new Predicate<File>()
         {
@@ -214,13 +220,13 @@ public class Directories
         {
             // use 2.1+ style
             String newSSTableRelativePath = join(metadata.ksName, cfName + '-' + cfId);
-            for (int i = 0; i < dataDirectories.length; ++i)
-                dataPaths[i] = new File(dataDirectories[i].location, newSSTableRelativePath);
+            for (int i = 0; i < paths.length; ++i)
+                dataPaths[i] = new File(paths[i].location, newSSTableRelativePath);
         }
         // if index, then move to its own directory
         if (indexNameWithDot != null)
         {
-            for (int i = 0; i < dataDirectories.length; ++i)
+            for (int i = 0; i < paths.length; ++i)
                 dataPaths[i] = new File(dataPaths[i], indexNameWithDot);
         }
 
@@ -327,7 +333,7 @@ public class Directories
 
         // pick directories with enough space and so that resulting sstable dirs aren't blacklisted for writes.
         boolean tooBig = false;
-        for (DataDirectory dataDir : dataDirectories)
+        for (DataDirectory dataDir : paths)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
             {
@@ -393,7 +399,7 @@ public class Directories
         long writeSize = expectedTotalWriteSize / estimatedSSTables;
         long totalAvailable = 0L;
 
-        for (DataDirectory dataDir : dataDirectories)
+        for (DataDirectory dataDir : paths)
         {
             if (BlacklistedDirectories.isUnwritable(getLocationForDisk(dataDir)))
                   continue;
@@ -481,6 +487,24 @@ public class Directories
         {
             return location.getUsableSpace();
         }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            DataDirectory that = (DataDirectory) o;
+
+            return location.equals(that.location);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return location.hashCode();
+        }
     }
 
     static final class DataDirectoryCandidate implements Comparable<DataDirectoryCandidate>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 78b593b..c2613fe 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -210,6 +210,11 @@ public class Keyspace
         return cfs;
     }
 
+    public boolean hasColumnFamilyStore(UUID id)
+    {
+        return columnFamilyStores.containsKey(id);
+    }
+
     /**
      * Take a snapshot of the specific column family, or the entire set of column families
      * if columnFamily is null with a given timestamp

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1b30fc7..4a54666 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -47,7 +47,6 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -345,22 +344,22 @@ public class Memtable implements Comparable<Memtable>
         {
             long writeSize = getExpectedWriteSize();
             Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize);
-            File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory);
+            File sstableDirectory = cfs.getDirectories().getLocationForDisk(dataDirectory);
             assert sstableDirectory != null : "Flush task is not bound to any disk";
-            SSTableReader sstable = writeSortedContents(context, sstableDirectory);
-            cfs.replaceFlushed(Memtable.this, sstable);
+            Collection<SSTableReader> sstables = writeSortedContents(context, sstableDirectory);
+            cfs.replaceFlushed(Memtable.this, sstables);
         }
 
         protected Directories getDirectories()
         {
-            return cfs.directories;
+            return cfs.getDirectories();
         }
 
-        private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
+        private Collection<SSTableReader> writeSortedContents(ReplayPosition context, File sstableDirectory)
         {
             logger.info("Writing {}", Memtable.this.toString());
 
-            SSTableReader ssTable;
+            Collection<SSTableReader> ssTables;
             try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
             {
                 boolean trackContention = logger.isDebugEnabled();
@@ -397,20 +396,20 @@ public class Memtable implements Comparable<Memtable>
                                               context));
 
                     // sstables should contain non-repaired data.
-                    ssTable = writer.finish(true);
+                    ssTables = writer.finish(true);
                 }
                 else
                 {
                     logger.info("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
                                 writer.getFilename(), context);
                     writer.abort();
-                    ssTable = null;
+                    ssTables = null;
                 }
 
                 if (heavilyContendedRowCount > 0)
                     logger.debug(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-                return ssTable;
+                return ssTables;
             }
         }
 
@@ -423,13 +422,12 @@ public class Memtable implements Comparable<Memtable>
             LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
             return new SSTableTxnWriter(txn,
-                                        SSTableWriter.create(Descriptor.fromFilename(filename),
-                                                             (long)partitions.size(),
-                                                             ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                             cfs.metadata,
-                                                             sstableMetadataCollector,
-                                                             new SerializationHeader(cfs.metadata, columns, stats),
-                                                             txn));
+                                        cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
+                                                                     (long)partitions.size(),
+                                                                     ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                     sstableMetadataCollector,
+                                                                     new SerializationHeader(cfs.metadata, columns, stats),
+                                                                     txn));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index d9c9ea3..721fd70 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -25,7 +25,12 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +43,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
@@ -113,6 +119,11 @@ public abstract class AbstractCompactionStrategy
         }
     }
 
+    public Directories getDirectories()
+    {
+        return cfs.getDirectories();
+    }
+
     /**
      * For internal, temporary suspension of background compactions so that we can do exceptional
      * things like truncate or major compaction
@@ -222,12 +233,12 @@ public abstract class AbstractCompactionStrategy
      * Handle a flushed memtable.
      *
      * @param memtable the flushed memtable
-     * @param sstable the written sstable. can be null if the memtable was clean.
+     * @param sstables the written sstables. can be null or empty if the memtable was clean.
      */
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
-        cfs.getTracker().replaceFlushed(memtable, sstable);
-        if (sstable != null)
+        cfs.getTracker().replaceFlushed(memtable, sstables);
+        if (sstables != null && !sstables.isEmpty())
             CompactionManager.instance.submitBackground(cfs);
     }
 
@@ -493,4 +504,9 @@ public abstract class AbstractCompactionStrategy
             groupedSSTables.add(currGroup);
         return groupedSSTables;
     }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+    {
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index 3bf224e..155bf2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -63,7 +64,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
             transaction.close();
         }
     }
-    public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
+    public abstract CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);
 
     protected abstract int executeInternal(CompactionExecutorStatsCollector collector);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 8aa16d5..66f9ed5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -594,7 +594,7 @@ public class CompactionManager implements CompactionManagerMBean
             }
             // group by keyspace/columnfamily
             ColumnFamilyStore cfs = Keyspace.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-            descriptors.put(cfs, cfs.directories.find(new File(filename.trim()).getName()));
+            descriptors.put(cfs, cfs.getDirectories().find(new File(filename.trim()).getName()));
         }
 
         List<Future<?>> futures = new ArrayList<>();
@@ -817,7 +817,7 @@ public class CompactionManager implements CompactionManagerMBean
 
         logger.info("Cleaning up {}", sstable);
 
-        File compactionFileLocation = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
+        File compactionFileLocation = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(txn.originals(), OperationType.CLEANUP));
         if (compactionFileLocation == null)
             throw new IOException("disk full");
 
@@ -1192,7 +1192,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.info("Anticompacting {}", anticompactionGroup);
         Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
 
-        File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
+        File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
         long repairedKeyCount = 0;
         long unrepairedKeyCount = 0;
         int nowInSec = FBUtilities.nowInSeconds();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index f5097af..47c8de8 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -27,15 +27,21 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.schema.CompactionParams;
+import org.apache.cassandra.service.ActiveRepairService;
 
 /**
  * Manages the compaction strategies.
@@ -181,10 +187,10 @@ public class CompactionStrategyManager implements INotificationConsumer
         startup();
     }
 
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
-        cfs.getTracker().replaceFlushed(memtable, sstable);
-        if (sstable != null)
+        cfs.getTracker().replaceFlushed(memtable, sstables);
+        if (sstables != null && !sstables.isEmpty())
             CompactionManager.instance.submitBackground(cfs);
     }
 
@@ -235,16 +241,24 @@ public class CompactionStrategyManager implements INotificationConsumer
         return repaired.shouldDefragment();
     }
 
+    public Directories getDirectories()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return repaired.getDirectories();
+    }
 
     public synchronized void handleNotification(INotification notification, Object sender)
     {
         if (notification instanceof SSTableAddedNotification)
         {
             SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
-            if (flushedNotification.added.isRepaired())
-                repaired.addSSTable(flushedNotification.added);
-            else
-                unrepaired.addSSTable(flushedNotification.added);
+            for (SSTableReader sstable : flushedNotification.added)
+            {
+                if (sstable.isRepaired())
+                    repaired.addSSTable(sstable);
+                else
+                    unrepaired.addSSTable(sstable);
+            }
         }
         else if (notification instanceof SSTableListChangedNotification)
         {
@@ -484,4 +498,16 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
     }
+
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+    {
+        if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+        {
+            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+        }
+        else
+        {
+            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 0bd6aae..1d96324 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -29,9 +28,9 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.DefaultCompactionWriter;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -43,7 +42,6 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager.CompactionExecutorStatsCollector;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -51,8 +49,8 @@ public class CompactionTask extends AbstractCompactionTask
 {
     protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
-    private final boolean offline;
-    private final boolean keepOriginals;
+    protected final boolean offline;
+    protected final boolean keepOriginals;
     protected static long totalBytesCompacted = 0;
     private CompactionExecutorStatsCollector collector;
 
@@ -154,7 +152,7 @@ public class CompactionTask extends AbstractCompactionTask
         {
             Set<SSTableReader> actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables());
 
-            List<SSTableReader> newSStables;
+            Collection<SSTableReader> newSStables;
 
             long[] mergedRowCounts;
 
@@ -173,7 +171,7 @@ public class CompactionTask extends AbstractCompactionTask
                 if (!controller.cfs.getCompactionStrategyManager().isActive)
                     throw new CompactionInterruptedException(ci.getCompactionInfo());
 
-                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact))
+                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                 {
                     estimatedKeys = writer.estimatedKeys();
                     while (ci.hasNext())
@@ -228,10 +226,11 @@ public class CompactionTask extends AbstractCompactionTask
 
     @Override
     public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          Directories directories,
                                                           LifecycleTransaction transaction,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, keepOriginals);
+        return new DefaultCompactionWriter(cfs, directories, transaction, nonExpiredSSTables, offline, keepOriginals);
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
@@ -252,6 +251,11 @@ public class CompactionTask extends AbstractCompactionTask
         return mergeSummary.toString();
     }
 
+    protected Directories getDirectories()
+    {
+        return cfs.getDirectories();
+    }
+
     public static long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
     {
         long minRepairedAt= Long.MAX_VALUE;
@@ -264,7 +268,7 @@ public class CompactionTask extends AbstractCompactionTask
 
     protected void checkAvailableDiskSpace(long estimatedSSTables, long expectedWriteSize)
     {
-        while (!cfs.directories.hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
+        while (!getDirectories().hasAvailableDiskSpace(estimatedSSTables, expectedWriteSize))
         {
             if (!reduceScopeForLimitedSpace())
                 throw new RuntimeException(String.format("Not enough space for compaction, estimated sstables = %d, expected write size = %d", estimatedSSTables, expectedWriteSize));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 11d113d..eeb3615 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
 import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
@@ -42,12 +43,13 @@ public class LeveledCompactionTask extends CompactionTask
 
     @Override
     public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          Directories directories,
                                                           LifecycleTransaction txn,
                                                           Set<SSTableReader> nonExpiredSSTables)
     {
         if (majorCompaction)
-            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
-        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
+            return new MajorLeveledCompactionWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, false, false);
+        return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index 1944364..3655a37 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -75,10 +75,11 @@ public class SSTableSplitter {
 
         @Override
         public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              Directories directories,
                                                               LifecycleTransaction txn,
                                                               Set<SSTableReader> nonExpiredSSTables)
         {
-            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
+            return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 747b956..c437832 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.UUIDGen;
@@ -106,7 +105,7 @@ public class Scrubber implements Closeable
         List<SSTableReader> toScrub = Collections.singletonList(sstable);
 
         // Calculate the expected compacted filesize
-        this.destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
+        this.destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(toScrub, OperationType.SCRUB));
         if (destination == null)
             throw new IOException("disk full");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 2353aa3..05f446c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
 import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -342,10 +343,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         @Override
         public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              Directories directories,
                                                               LifecycleTransaction txn,
                                                               Set<SSTableReader> nonExpiredSSTables)
         {
-            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables);
+            return new SplittingSizeTieredCompactionWriter(cfs, directories, txn, nonExpiredSSTables);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 50e5a96..abc4107 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -18,14 +18,17 @@
 
 package org.apache.cassandra.db.compaction.writers;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.CompactionTask;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -38,6 +41,7 @@ import org.apache.cassandra.utils.concurrent.Transactional;
 public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
 {
     protected final ColumnFamilyStore cfs;
+    protected final Directories directories;
     protected final Set<SSTableReader> nonExpiredSSTables;
     protected final long estimatedTotalKeys;
     protected final long maxAge;
@@ -45,35 +49,25 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
 
     protected final LifecycleTransaction txn;
     protected final SSTableRewriter sstableWriter;
+    private boolean isInitialized = false;
 
     public CompactionAwareWriter(ColumnFamilyStore cfs,
-                                 LifecycleTransaction txn,
-                                 Set<SSTableReader> nonExpiredSSTables)
-    {
-        this(cfs, txn, nonExpiredSSTables, false, false);
-    }
-
-    public CompactionAwareWriter(ColumnFamilyStore cfs,
+                                 Directories directories,
                                  LifecycleTransaction txn,
                                  Set<SSTableReader> nonExpiredSSTables,
                                  boolean offline,
                                  boolean keepOriginals)
     {
         this.cfs = cfs;
+        this.directories = directories;
         this.nonExpiredSSTables = nonExpiredSSTables;
         this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
         this.txn = txn;
         this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(keepOriginals);
-    }
 
-    /**
-     * Writes a partition in an implementation specific way
-     * @param partition the partition to append
-     * @return true if the partition was written, false otherwise
-     */
-    public abstract boolean append(UnfilteredRowIterator partition);
+    }
 
     @Override
     protected Throwable doAbort(Throwable accumulate)
@@ -98,7 +92,7 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
      * @return all the written sstables sstables
      */
     @Override
-    public List<SSTableReader> finish()
+    public Collection<SSTableReader> finish()
     {
         super.finish();
         return sstableWriter.finished();
@@ -112,12 +106,39 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
         return estimatedTotalKeys;
     }
 
+    public final boolean append(UnfilteredRowIterator partition)
+    {
+        maybeSwitchWriter(partition.partitionKey());
+        return realAppend(partition);
+    }
+
+    protected abstract boolean realAppend(UnfilteredRowIterator partition);
+
+    /**
+     * Guaranteed to be called before the first call to realAppend.
+     * @param key
+     */
+    protected void maybeSwitchWriter(DecoratedKey key)
+    {
+        if (!isInitialized)
+            switchCompactionLocation(getDirectories().getWriteableLocation(cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType())));
+        isInitialized = true;
+    }
+
+    /**
+     * Implementations of this method should finish the current sstable writer and start writing to this directory.
+     *
+     * Called once before starting to append and then whenever we see a need to start writing to another directory.
+     * @param directory
+     */
+    protected abstract void switchCompactionLocation(Directories.DataDirectory directory);
+
     /**
      * The directories we can write to
      */
     public Directories getDirectories()
     {
-        return cfs.directories;
+        return directories;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index eb55d20..8b90224 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -18,13 +18,13 @@
 package org.apache.cassandra.db.compaction.writers;
 
 
-import java.io.File;
 import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -40,20 +40,28 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
 {
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
 
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, txn, nonExpiredSSTables, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, false, false);
     }
 
     @SuppressWarnings("resource")
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, boolean keepOriginals)
+    {
+        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
+    }
+
+    @Override
+    public boolean realAppend(UnfilteredRowIterator partition)
+    {
+        return sstableWriter.append(partition) != null;
+    }
+
+    @Override
+    protected void switchCompactionLocation(Directories.DataDirectory directory)
     {
-        super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
-        logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
-        long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(directory))),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     cfs.metadata,
@@ -64,12 +72,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
-    public boolean append(UnfilteredRowIterator partition)
-    {
-        return sstableWriter.append(partition) != null;
-    }
-
-    @Override
     public long estimatedKeys()
     {
         return estimatedTotalKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 73ce216..6d191f8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,43 +48,32 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     private int sstablesWritten = 0;
 
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+                                        Directories directories,
                                         LifecycleTransaction txn,
                                         Set<SSTableReader> nonExpiredSSTables,
                                         long maxSSTableSize)
     {
-        this(cfs, txn, nonExpiredSSTables, maxSSTableSize, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, false, false);
     }
 
     @SuppressWarnings("resource")
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs,
+                                        Directories directories,
                                         LifecycleTransaction txn,
                                         Set<SSTableReader> nonExpiredSSTables,
                                         long maxSSTableSize,
                                         boolean offline,
                                         boolean keepOriginals)
     {
-        super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
         this.maxSSTableSize = maxSSTableSize;
         this.allSSTables = txn.originals();
         expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
-        long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
-        long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                    keysPerSSTable,
-                                                    minRepairedAt,
-                                                    cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                    txn);
-        sstableWriter.switchWriter(writer);
     }
 
     @Override
     @SuppressWarnings("resource")
-    public boolean append(UnfilteredRowIterator partition)
+    public boolean realAppend(UnfilteredRowIterator partition)
     {
         long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
         RowIndexEntry rie = sstableWriter.append(partition);
@@ -98,19 +88,25 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
             }
 
             averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
-            File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                        averageEstimatedKeysPerSSTable,
-                                                        minRepairedAt,
-                                                        cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                        txn);
-            sstableWriter.switchWriter(writer);
+            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
             partitionsWritten = 0;
             sstablesWritten++;
         }
         return rie != null;
 
     }
+
+    public void switchCompactionLocation(Directories.DataDirectory directory)
+    {
+        File sstableDirectory = getDirectories().getLocationForDisk(directory);
+        @SuppressWarnings("resource")
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+                                                    averageEstimatedKeysPerSSTable,
+                                                    minRepairedAt,
+                                                    cfs.metadata,
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
+        sstableWriter.switchWriter(writer);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 241af0d..142fe87 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -17,10 +17,10 @@
  */
 package org.apache.cassandra.db.compaction.writers;
 
-import java.io.File;
 import java.util.Set;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -40,16 +40,18 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     private final Set<SSTableReader> allSSTables;
 
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+                                Directories directories,
                                 LifecycleTransaction txn,
                                 Set<SSTableReader> nonExpiredSSTables,
                                 long maxSSTableSize,
                                 int level)
     {
-        this(cfs, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
+        this(cfs, directories, txn, nonExpiredSSTables, maxSSTableSize, level, false, false);
     }
 
     @SuppressWarnings("resource")
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs,
+                                Directories directories,
                                 LifecycleTransaction txn,
                                 Set<SSTableReader> nonExpiredSSTables,
                                 long maxSSTableSize,
@@ -57,7 +59,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                 boolean offline,
                                 boolean keepOriginals)
     {
-        super(cfs, txn, nonExpiredSSTables, offline, keepOriginals);
+        super(cfs, directories, txn, nonExpiredSSTables, offline, keepOriginals);
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
@@ -65,37 +67,30 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
+    }
+
+    @Override
+    public boolean realAppend(UnfilteredRowIterator partition)
+    {
+        RowIndexEntry rie = sstableWriter.append(partition);
+        if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
+            switchCompactionLocation(getWriteDirectory(expectedWriteSize));
+        return rie != null;
+    }
+
+    public void switchCompactionLocation(Directories.DataDirectory location)
+    {
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
                                                     txn);
-        sstableWriter.switchWriter(writer);
-    }
 
-    @Override
-    public boolean append(UnfilteredRowIterator partition)
-    {
-        RowIndexEntry rie = sstableWriter.append(partition);
-        if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
-        {
-            File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-            @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                        estimatedTotalKeys / estimatedSSTables,
-                                                        minRepairedAt,
-                                                        cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                        txn);
+        sstableWriter.switchWriter(writer);
 
-            sstableWriter.switchWriter(writer);
-        }
-        return rie != null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 65924fa..07ca3d0 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -51,15 +52,15 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
     private long currentBytesToWrite;
     private int currentRatioIndex = 0;
 
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
     {
-        this(cfs, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
+        this(cfs, directories, txn, nonExpiredSSTables, DEFAULT_SMALLEST_SSTABLE_BYTES);
     }
 
     @SuppressWarnings("resource")
-    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
+    public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long smallestSSTable)
     {
-        super(cfs, txn, nonExpiredSSTables, false, false);
+        super(cfs, directories, txn, nonExpiredSSTables, false, false);
         this.allSSTables = txn.originals();
         totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         double[] potentialRatios = new double[20];
@@ -81,43 +82,38 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
             }
         }
         ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
-        File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
         long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
         currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-        @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                    currentPartitionsToWrite,
-                                                    minRepairedAt,
-                                                    cfs.metadata,
-                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                    txn);
-
-        sstableWriter.switchWriter(writer);
+        switchCompactionLocation(getWriteDirectory(currentBytesToWrite));
         logger.debug("Ratios={}, expectedKeys = {}, totalSize = {}, currentPartitionsToWrite = {}, currentBytesToWrite = {}", ratios, estimatedTotalKeys, totalSize, currentPartitionsToWrite, currentBytesToWrite);
     }
 
     @Override
-    public boolean append(UnfilteredRowIterator partition)
+    public boolean realAppend(UnfilteredRowIterator partition)
     {
         RowIndexEntry rie = sstableWriter.append(partition);
         if (sstableWriter.currentWriter().getOnDiskFilePointer() > currentBytesToWrite && currentRatioIndex < ratios.length - 1) // if we underestimate how many keys we have, the last sstable might get more than we expect
         {
             currentRatioIndex++;
             currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
-            long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
-            File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
-            @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
-                                                        currentPartitionsToWrite,
-                                                        minRepairedAt,
-                                                        cfs.metadata,
-                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
-                                                        txn);
-            sstableWriter.switchWriter(writer);
-            logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+            switchCompactionLocation(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
         }
         return rie != null;
     }
+
+    public void switchCompactionLocation(Directories.DataDirectory location)
+    {
+        long currentPartitionsToWrite = Math.round(ratios[currentRatioIndex] * estimatedTotalKeys);
+        @SuppressWarnings("resource")
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(getDirectories().getLocationForDisk(location))),
+                                                    currentPartitionsToWrite,
+                                                    minRepairedAt,
+                                                    cfs.metadata,
+                                                    new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
+        logger.debug("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
+        sstableWriter.switchWriter(writer);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
index c6cb979..520b229 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java
@@ -338,6 +338,7 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
         return accumulate;
     }
 
+
     /**
      * update a reader: if !original, this is a reader that is being introduced by this transaction;
      * otherwise it must be in the originals() set, i.e. a reader guarded by this transaction
@@ -355,6 +356,14 @@ public class LifecycleTransaction extends Transactional.AbstractTransactional
             reader.setupOnline();
     }
 
+    public void update(Collection<SSTableReader> readers, boolean original)
+    {
+        for(SSTableReader reader: readers)
+        {
+            update(reader, original);
+        }
+    }
+
     /**
      * mark this reader as for obsoletion : on checkpoint() the reader will be removed from the live set
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 6f6aca9..d028493 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -186,11 +186,8 @@ public class Tracker
     public void addSSTables(Iterable<SSTableReader> sstables)
     {
         addInitialSSTables(sstables);
-        for (SSTableReader sstable : sstables)
-        {
-            maybeIncrementallyBackup(sstable);
-            notifyAdded(sstable);
-        }
+        maybeIncrementallyBackup(sstables);
+        notifyAdded(sstables);
     }
 
     /** (Re)initializes the tracker, purging all references. */
@@ -330,10 +327,10 @@ public class Tracker
         apply(View.markFlushing(memtable));
     }
 
-    public void replaceFlushed(Memtable memtable, SSTableReader sstable)
+    public void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables)
     {
         assert !isDummy();
-        if (sstable == null)
+        if (sstables == null || sstables.isEmpty())
         {
             // sstable may be null if we flushed batchlog and nothing needed to be retained
             // if it's null, we don't care what state the cfstore is in, we just replace it and continue
@@ -341,16 +338,16 @@ public class Tracker
             return;
         }
 
-        sstable.setupOnline();
+        sstables.forEach(SSTableReader::setupOnline);
         // back up before creating a new Snapshot (which makes the new one eligible for compaction)
-        maybeIncrementallyBackup(sstable);
+        maybeIncrementallyBackup(sstables);
 
-        apply(View.replaceFlushed(memtable, sstable));
+        apply(View.replaceFlushed(memtable, sstables));
 
         Throwable fail;
-        fail = updateSizeTracking(emptySet(), singleton(sstable), null);
+        fail = updateSizeTracking(emptySet(), sstables, null);
         // TODO: if we're invalidated, should we notifyadded AND removed, or just skip both?
-        fail = notifyAdded(sstable, fail);
+        fail = notifyAdded(sstables, fail);
 
         if (!isDummy() && !cfstore.isValid())
             dropSSTables();
@@ -377,13 +374,16 @@ public class Tracker
         return view.get().getUncompacting(candidates);
     }
 
-    public void maybeIncrementallyBackup(final SSTableReader sstable)
+    public void maybeIncrementallyBackup(final Iterable<SSTableReader> sstables)
     {
         if (!DatabaseDescriptor.isIncrementalBackupsEnabled())
             return;
 
-        File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
-        sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+        for (SSTableReader sstable : sstables)
+        {
+            File backupsDir = Directories.getBackupsDirectory(sstable.descriptor);
+            sstable.createLinks(FileUtils.getCanonicalPath(backupsDir));
+        }
     }
 
     // NOTIFICATION
@@ -405,7 +405,7 @@ public class Tracker
         return accumulate;
     }
 
-    Throwable notifyAdded(SSTableReader added, Throwable accumulate)
+    Throwable notifyAdded(Iterable<SSTableReader> added, Throwable accumulate)
     {
         INotification notification = new SSTableAddedNotification(added);
         for (INotificationConsumer subscriber : subscribers)
@@ -422,7 +422,7 @@ public class Tracker
         return accumulate;
     }
 
-    public void notifyAdded(SSTableReader added)
+    public void notifyAdded(Iterable<SSTableReader> added)
     {
         maybeFail(notifyAdded(added, null));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 7ee0fdf..b62c7e3 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -310,7 +310,7 @@ public class View
     }
 
     // called after flush: removes memtable from flushingMemtables, and inserts flushed into the live sstable set
-    static Function<View, View> replaceFlushed(final Memtable memtable, final SSTableReader flushed)
+    static Function<View, View> replaceFlushed(final Memtable memtable, final Collection<SSTableReader> flushed)
     {
         return new Function<View, View>()
         {
@@ -323,7 +323,7 @@ public class View
                     return new View(view.liveMemtables, flushingMemtables, view.sstablesMap,
                                     view.compactingMap, view.intervalTree);
 
-                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), singleton(flushed));
+                Map<SSTableReader, SSTableReader> sstableMap = replace(view.sstablesMap, emptySet(), flushed);
                 return new View(view.liveMemtables, flushingMemtables, sstableMap, view.compactingMap,
                                 SSTableIntervalTree.build(sstableMap.keySet()));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index f4b4da8..d94b219 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -60,11 +60,12 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
 
     protected SSTableTxnWriter createWriter()
     {
-        return SSTableTxnWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
-                                    0,
-                                    ActiveRepairService.UNREPAIRED_SSTABLE,
-                                    0,
-                                    new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
+        return SSTableTxnWriter.create(metadata,
+                                       createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+                                       0,
+                                       ActiveRepairService.UNREPAIRED_SSTABLE,
+                                       0,
+                                       new SerializationHeader(metadata, columns, EncodingStats.NO_STATS));
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
new file mode 100644
index 0000000..0bb3721
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableMultiWriter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.Throwables;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public interface SSTableMultiWriter extends Transactional
+{
+
+    /**
+     * Writes a partition in an implementation specific way
+     * @param partition the partition to append
+     * @return true if the partition was written, false otherwise
+     */
+    boolean append(UnfilteredRowIterator partition);
+
+    Collection<SSTableReader> finish(long repairedAt, long maxDataAge, boolean openResult);
+    Collection<SSTableReader> finish(boolean openResult);
+    Collection<SSTableReader> finished();
+
+    SSTableMultiWriter setOpenResult(boolean openResult);
+
+    String getFilename();
+    long getFilePointer();
+    UUID getCfId();
+
+    static void abortOrDie(SSTableMultiWriter writer)
+    {
+        Throwables.maybeFail(writer.abort(null));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed27277/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 42bffb1..6e1ac38 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -18,13 +18,17 @@
 
 package org.apache.cassandra.io.sstable;
 
-import org.apache.cassandra.db.RowIndexEntry;
+import java.util.Collection;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.concurrent.Transactional;
 
 /**
@@ -35,15 +39,15 @@ import org.apache.cassandra.utils.concurrent.Transactional;
 public class SSTableTxnWriter extends Transactional.AbstractTransactional implements Transactional
 {
     private final LifecycleTransaction txn;
-    private final SSTableWriter writer;
+    private final SSTableMultiWriter writer;
 
-    public SSTableTxnWriter(LifecycleTransaction txn, SSTableWriter writer)
+    public SSTableTxnWriter(LifecycleTransaction txn, SSTableMultiWriter writer)
     {
         this.txn = txn;
         this.writer = writer;
     }
 
-    public RowIndexEntry append(UnfilteredRowIterator iterator)
+    public boolean append(UnfilteredRowIterator iterator)
     {
         return writer.append(iterator);
     }
@@ -74,28 +78,43 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
         writer.prepareToCommit();
     }
 
-    public SSTableReader finish(boolean openResult)
+    public Collection<SSTableReader> finish(boolean openResult)
     {
         writer.setOpenResult(openResult);
         finish();
         return writer.finished();
     }
 
-    public static SSTableTxnWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
     {
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        SSTableMultiWriter writer = cfs.createSSTableMultiWriter(descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        return new SSTableTxnWriter(txn, writer);
+    }
+
+    public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    {
+        if (Keyspace.open(cfm.ksName).hasColumnFamilyStore(cfm.cfId))
+        {
+            ColumnFamilyStore cfs = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId);
+            return create(cfs, descriptor, keyCount, repairedAt, sstableLevel, header);
+        }
+
+        // if the column family store does not exist, we create a new default SSTableMultiWriter to use:
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, descriptor.directory);
+        MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
+        SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 
-    public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
     {
         Descriptor desc = Descriptor.fromFilename(filename);
-        return create(desc, keyCount, repairedAt, sstableLevel, header);
+        return create(cfs, desc, keyCount, repairedAt, sstableLevel, header);
     }
 
-    public static SSTableTxnWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header)
+    public static SSTableTxnWriter create(ColumnFamilyStore cfs, String filename, long keyCount, long repairedAt, SerializationHeader header)
     {
-        return create(filename, keyCount, repairedAt, 0, header);
+        return create(cfs, filename, keyCount, repairedAt, 0, header);
     }
 }