You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/19 16:56:39 UTC

[1/3] cassandra git commit: Reduce heap spent when receiving many SSTables

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 d5326d4aa -> 0e63000c3
  refs/heads/trunk 7e4737716 -> c0fd119ce


Reduce heap spent when receiving many SSTables

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10797


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e63000c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e63000c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e63000c

Branch: refs/heads/cassandra-3.0
Commit: 0e63000c3fd0029e5b620a7923ea2ac54771e8e9
Parents: d5326d4
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Dec 18 20:33:21 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Sat Dec 19 08:35:12 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  |  19 +-
 .../io/sstable/SSTableRewriterTest.java         | 165 +--------------
 .../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++
 .../io/sstable/SSTableWriterTestBase.java       | 166 +++++++++++++++
 5 files changed, 374 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d0f1613..ff139c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.3
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
  * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
  * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
  * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0230d14..92a14d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask
     private boolean done = false;
 
     //  holds references to SSTables received
-    protected Collection<SSTableMultiWriter> sstables;
+    protected Collection<SSTableReader> sstables;
 
     public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
     {
@@ -97,7 +97,10 @@ public class StreamReceiveTask extends StreamTask
 
         assert cfId.equals(sstable.getCfId());
 
-        sstables.add(sstable);
+        Collection<SSTableReader> finished = sstable.finish(true);
+        txn.update(finished, false);
+        sstables.addAll(finished);
+
         if (sstables.size() == totalFiles)
         {
             done = true;
@@ -134,7 +137,6 @@ public class StreamReceiveTask extends StreamTask
                 if (kscf == null)
                 {
                     // schema was dropped during streaming
-                    task.sstables.forEach(SSTableMultiWriter::abortOrDie);
                     task.sstables.clear();
                     task.txn.abort();
                     task.session.taskCompleted(task);
@@ -143,15 +145,7 @@ public class StreamReceiveTask extends StreamTask
                 cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
                 hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
 
-                List<SSTableReader> readers = new ArrayList<>();
-                for (SSTableMultiWriter writer : task.sstables)
-                {
-                    Collection<SSTableReader> newReaders = writer.finish(true);
-                    readers.addAll(newReaders);
-                    task.txn.update(newReaders, false);
-                }
-
-                task.sstables.clear();
+                Collection<SSTableReader> readers = task.sstables;
 
                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
@@ -245,7 +239,6 @@ public class StreamReceiveTask extends StreamTask
             return;
 
         done = true;
-        sstables.forEach(SSTableMultiWriter::abortOrDie);
         txn.abort();
         sstables.clear();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bfe7b08..008df06 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,48 +69,8 @@ import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.*;
 
-public class SSTableRewriterTest extends SchemaLoader
+public class SSTableRewriterTest extends SSTableWriterTestBase
 {
-    private static final String KEYSPACE = "SSTableRewriterTest";
-    private static final String CF = "Standard1";
-
-    private static Config.DiskAccessMode standardMode;
-    private static Config.DiskAccessMode indexMode;
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        if (FBUtilities.isWindows())
-        {
-            standardMode = DatabaseDescriptor.getDiskAccessMode();
-            indexMode = DatabaseDescriptor.getIndexAccessMode();
-
-            DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
-            DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
-        }
-
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF));
-    }
-
-    @AfterClass
-    public static void revertDiskAccess()
-    {
-        DatabaseDescriptor.setDiskAccessMode(standardMode);
-        DatabaseDescriptor.setIndexAccessMode(indexMode);
-    }
-
-    @After
-    public void truncateCF()
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
-        store.truncateBlocking();
-        LifecycleTransaction.waitForDeletions();
-    }
-
     @Test
     public void basicTest() throws InterruptedException
     {
@@ -239,56 +199,6 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testFileRemoval() throws InterruptedException
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        truncate(cfs);
-
-        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
-        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
-        try (SSTableWriter writer = getWriter(cfs, dir, txn))
-        {
-            for (int i = 0; i < 10000; i++)
-            {
-                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
-                for (int j = 0; j < 100; j++)
-                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
-                writer.append(builder.build().unfilteredIterator());
-            }
-
-            SSTableReader s = writer.setMaxDataAge(1000).openEarly();
-            assert s != null;
-            assertFileCounts(dir.list());
-            for (int i = 10000; i < 20000; i++)
-            {
-                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
-                for (int j = 0; j < 100; j++)
-                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
-                writer.append(builder.build().unfilteredIterator());
-            }
-            SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
-            assertTrue(s.last.compareTo(s2.last) < 0);
-            assertFileCounts(dir.list());
-            s.selfRef().release();
-            s2.selfRef().release();
-            // These checks don't work on Windows because the writer has the channel still
-            // open till .abort() is called (via the builder)
-            if (!FBUtilities.isWindows())
-            {
-                LifecycleTransaction.waitForDeletions();
-                assertFileCounts(dir.list());
-            }
-            writer.abort();
-            txn.abort();
-            LifecycleTransaction.waitForDeletions();
-            int datafiles = assertFileCounts(dir.list());
-            assertEquals(datafiles, 0);
-            validateCFS(cfs);
-        }
-    }
-
-    @Test
     public void testNumberOfFilesAndSizes() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
@@ -919,16 +829,6 @@ public class SSTableRewriterTest extends SchemaLoader
         }
     }
 
-    public static void truncate(ColumnFamilyStore cfs)
-    {
-        cfs.truncateBlocking();
-        LifecycleTransaction.waitForDeletions();
-        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
-        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
-        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
-        validateCFS(cfs);
-    }
-
     public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
         return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
@@ -959,67 +859,4 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         return result;
     }
-
-    public static void validateCFS(ColumnFamilyStore cfs)
-    {
-        Set<Integer> liveDescriptors = new HashSet<>();
-        long spaceUsed = 0;
-        for (SSTableReader sstable : cfs.getLiveSSTables())
-        {
-            assertFalse(sstable.isMarkedCompacted());
-            assertEquals(1, sstable.selfRef().globalCount());
-            liveDescriptors.add(sstable.descriptor.generation);
-            spaceUsed += sstable.bytesOnDisk();
-        }
-        for (File dir : cfs.getDirectories().getCFDirectories())
-        {
-            for (File f : dir.listFiles())
-            {
-                if (f.getName().contains("Data"))
-                {
-                    Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
-                    assertTrue(d.toString(), liveDescriptors.contains(d.generation));
-                }
-            }
-        }
-        assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
-        assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
-        assertTrue(cfs.getTracker().getCompacting().isEmpty());
-    }
-
-    public static int assertFileCounts(String [] files)
-    {
-        int tmplinkcount = 0;
-        int tmpcount = 0;
-        int datacount = 0;
-        for (String f : files)
-        {
-            if (f.endsWith("-CRC.db"))
-                continue;
-            if (f.contains("tmplink-"))
-                tmplinkcount++;
-            else if (f.contains("tmp-"))
-                tmpcount++;
-            else if (f.contains("Data"))
-                datacount++;
-        }
-        assertEquals(0, tmplinkcount);
-        assertEquals(0, tmpcount);
-        return datacount;
-    }
-
-    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
-    {
-        String filename = cfs.getSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
-    }
-
-    public static ByteBuffer random(int i, int size)
-    {
-        byte[] bytes = new byte[size + 4];
-        ThreadLocalRandom.current().nextBytes(bytes);
-        ByteBuffer r = ByteBuffer.wrap(bytes);
-        r.putInt(0, i);
-        return r;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
new file mode 100644
index 0000000..a73a164
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTest extends SSTableWriterTestBase
+{
+    @Test
+    public void testAbortTxnWithOpenEarlyShouldRemoveSSTable() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
+        try (SSTableWriter writer = getWriter(cfs, dir, txn))
+        {
+            for (int i = 0; i < 10000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+
+            SSTableReader s = writer.setMaxDataAge(1000).openEarly();
+            assert s != null;
+            assertFileCounts(dir.list());
+            for (int i = 10000; i < 20000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+            SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
+            assertTrue(s.last.compareTo(s2.last) < 0);
+            assertFileCounts(dir.list());
+            s.selfRef().release();
+            s2.selfRef().release();
+
+            int datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 1);
+
+            // These checks don't work on Windows because the writer has the channel still
+            // open till .abort() is called (via the builder)
+            if (!FBUtilities.isWindows())
+            {
+                LifecycleTransaction.waitForDeletions();
+                assertFileCounts(dir.list());
+            }
+            writer.abort();
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+            datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+    }
+
+
+    @Test
+    public void testAbortTxnWithClosedWriterShouldRemoveSSTable() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+        try (SSTableWriter writer = getWriter(cfs, dir, txn))
+        {
+            for (int i = 0; i < 10000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+
+            assertFileCounts(dir.list());
+            for (int i = 10000; i < 20000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+            SSTableReader sstable = writer.finish(true);
+            int datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 1);
+
+            sstable.selfRef().release();
+            // These checks don't work on Windows because the writer has the channel still
+            // open till .abort() is called (via the builder)
+            if (!FBUtilities.isWindows())
+            {
+                LifecycleTransaction.waitForDeletions();
+                assertFileCounts(dir.list());
+            }
+
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+            datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+    }
+
+    @Test
+    public void testAbortTxnWithClosedAndOpenWriterShouldRemoveAllSSTables() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+        SSTableWriter writer1 = getWriter(cfs, dir, txn);
+        SSTableWriter writer2 = getWriter(cfs, dir, txn);
+        try
+        {
+            for (int i = 0; i < 10000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer1.append(builder.build().unfilteredIterator());
+            }
+
+            assertFileCounts(dir.list());
+            for (int i = 10000; i < 20000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer2.append(builder.build().unfilteredIterator());
+            }
+            SSTableReader sstable = writer1.finish(true);
+            txn.update(sstable, false);
+
+            assertFileCounts(dir.list());
+
+            int datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 2);
+
+            // These checks don't work on Windows because the writer has the channel still
+            // open till .abort() is called (via the builder)
+            if (!FBUtilities.isWindows())
+            {
+                LifecycleTransaction.waitForDeletions();
+                assertFileCounts(dir.list());
+            }
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+            datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+        finally
+        {
+            writer1.close();
+            writer2.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
new file mode 100644
index 0000000..0af743d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTestBase extends SchemaLoader
+{
+
+    protected static final String KEYSPACE = "SSTableRewriterTest";
+    protected static final String CF = "Standard1";
+
+    private static Config.DiskAccessMode standardMode;
+    private static Config.DiskAccessMode indexMode;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        if (FBUtilities.isWindows())
+        {
+            standardMode = DatabaseDescriptor.getDiskAccessMode();
+            indexMode = DatabaseDescriptor.getIndexAccessMode();
+
+            DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
+            DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
+        }
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF));
+    }
+
+    @AfterClass
+    public static void revertDiskAccess()
+    {
+        DatabaseDescriptor.setDiskAccessMode(standardMode);
+        DatabaseDescriptor.setIndexAccessMode(indexMode);
+    }
+
+    @After
+    public void truncateCF()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.truncateBlocking();
+        LifecycleTransaction.waitForDeletions();
+    }
+
+    public static void truncate(ColumnFamilyStore cfs)
+    {
+        cfs.truncateBlocking();
+        LifecycleTransaction.waitForDeletions();
+        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+        validateCFS(cfs);
+    }
+
+    public static void validateCFS(ColumnFamilyStore cfs)
+    {
+        Set<Integer> liveDescriptors = new HashSet<>();
+        long spaceUsed = 0;
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.selfRef().globalCount());
+            liveDescriptors.add(sstable.descriptor.generation);
+            spaceUsed += sstable.bytesOnDisk();
+        }
+        for (File dir : cfs.getDirectories().getCFDirectories())
+        {
+            for (File f : dir.listFiles())
+            {
+                if (f.getName().contains("Data"))
+                {
+                    Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
+                    assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+                }
+            }
+        }
+        assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+    }
+
+    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+    {
+        String filename = cfs.getSSTablePath(directory);
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+    }
+
+    public static ByteBuffer random(int i, int size)
+    {
+        byte[] bytes = new byte[size + 4];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        ByteBuffer r = ByteBuffer.wrap(bytes);
+        r.putInt(0, i);
+        return r;
+    }
+
+    public static int assertFileCounts(String [] files)
+    {
+        int tmplinkcount = 0;
+        int tmpcount = 0;
+        int datacount = 0;
+        for (String f : files)
+        {
+            if (f.endsWith("-CRC.db"))
+                continue;
+            if (f.contains("tmplink-"))
+                tmplinkcount++;
+            else if (f.contains("tmp-"))
+                tmpcount++;
+            else if (f.contains("Data"))
+                datacount++;
+        }
+        assertEquals(0, tmplinkcount);
+        assertEquals(0, tmpcount);
+        return datacount;
+    }
+}


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c0fd119c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c0fd119c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c0fd119c

Branch: refs/heads/trunk
Commit: c0fd119cea3240db3435575b1345d0bcefec0dc4
Parents: 7e47377 0e63000
Author: Yuki Morishita <yu...@apache.org>
Authored: Sat Dec 19 09:56:28 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Sat Dec 19 09:56:28 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  |  19 +-
 .../io/sstable/SSTableRewriterTest.java         | 165 +--------------
 .../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++
 .../io/sstable/SSTableWriterTestBase.java       | 166 +++++++++++++++
 5 files changed, 374 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0fd119c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bf2ca61,ff139c4..7bbfec6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,5 +1,28 @@@
 -3.0.3
 +3.2
 + * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
 + * Fix EstimatedHistogram creation in nodetool tablehistograms (CASSANDRA-10859)
 + * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
 + * Sort compactionhistory output by timestamp (CASSANDRA-10464)
 + * More efficient BTree removal (CASSANDRA-9991)
 + * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)
 + * Group pending compactions based on table (CASSANDRA-10718)
 + * Add compressor name in sstablemetadata output (CASSANDRA-9879)
 + * Fix type casting for counter columns (CASSANDRA-10824)
 + * Prevent running Cassandra as root (CASSANDRA-8142)
 + * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
 + * Normalize all scripts (CASSANDRA-10679)
 + * Make compression ratio much more accurate (CASSANDRA-10225)
 + * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
 + * Make index building pluggable (CASSANDRA-10681)
 + * Add sstable flush observer (CASSANDRA-10678)
 + * Improve NTS endpoints calculation (CASSANDRA-10200)
 + * Improve performance of the folderSize function (CASSANDRA-10677)
 + * Add support for type casting in selection clause (CASSANDRA-10310)
 + * Added graphing option to cassandra-stress (CASSANDRA-7918)
 + * Abort in-progress queries that time out (CASSANDRA-7392)
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +Merged from 3.0:
+  * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
   * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
   * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
   * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0fd119c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index 0000000,0af743d..70f154d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@@ -1,0 -1,166 +1,166 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.ThreadLocalRandom;
+ import java.util.concurrent.TimeUnit;
+ 
+ import com.google.common.util.concurrent.Uninterruptibles;
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ 
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SerializationHeader;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.rows.EncodingStats;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.FBUtilities;
+ 
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ 
+ public class SSTableWriterTestBase extends SchemaLoader
+ {
+ 
+     protected static final String KEYSPACE = "SSTableRewriterTest";
+     protected static final String CF = "Standard1";
+ 
+     private static Config.DiskAccessMode standardMode;
+     private static Config.DiskAccessMode indexMode;
+ 
+     @BeforeClass
+     public static void defineSchema() throws ConfigurationException
+     {
+         if (FBUtilities.isWindows())
+         {
+             standardMode = DatabaseDescriptor.getDiskAccessMode();
+             indexMode = DatabaseDescriptor.getIndexAccessMode();
+ 
+             DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
+             DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
+         }
+ 
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE, CF));
+     }
+ 
+     @AfterClass
+     public static void revertDiskAccess()
+     {
+         DatabaseDescriptor.setDiskAccessMode(standardMode);
+         DatabaseDescriptor.setIndexAccessMode(indexMode);
+     }
+ 
+     @After
+     public void truncateCF()
+     {
+         Keyspace keyspace = Keyspace.open(KEYSPACE);
+         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+         store.truncateBlocking();
+         LifecycleTransaction.waitForDeletions();
+     }
+ 
+     public static void truncate(ColumnFamilyStore cfs)
+     {
+         cfs.truncateBlocking();
+         LifecycleTransaction.waitForDeletions();
+         Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+         assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+         assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+         validateCFS(cfs);
+     }
+ 
+     public static void validateCFS(ColumnFamilyStore cfs)
+     {
+         Set<Integer> liveDescriptors = new HashSet<>();
+         long spaceUsed = 0;
+         for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             assertFalse(sstable.isMarkedCompacted());
+             assertEquals(1, sstable.selfRef().globalCount());
+             liveDescriptors.add(sstable.descriptor.generation);
+             spaceUsed += sstable.bytesOnDisk();
+         }
+         for (File dir : cfs.getDirectories().getCFDirectories())
+         {
+             for (File f : dir.listFiles())
+             {
+                 if (f.getName().contains("Data"))
+                 {
+                     Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
+                     assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+                 }
+             }
+         }
+         assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+         assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+         assertTrue(cfs.getTracker().getCompacting().isEmpty());
+     }
+ 
+     public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+     {
+         String filename = cfs.getSSTablePath(directory);
 -        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
++        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+     }
+ 
+     public static ByteBuffer random(int i, int size)
+     {
+         byte[] bytes = new byte[size + 4];
+         ThreadLocalRandom.current().nextBytes(bytes);
+         ByteBuffer r = ByteBuffer.wrap(bytes);
+         r.putInt(0, i);
+         return r;
+     }
+ 
+     public static int assertFileCounts(String [] files)
+     {
+         int tmplinkcount = 0;
+         int tmpcount = 0;
+         int datacount = 0;
+         for (String f : files)
+         {
+             if (f.endsWith("-CRC.db"))
+                 continue;
+             if (f.contains("tmplink-"))
+                 tmplinkcount++;
+             else if (f.contains("tmp-"))
+                 tmpcount++;
+             else if (f.contains("Data"))
+                 datacount++;
+         }
+         assertEquals(0, tmplinkcount);
+         assertEquals(0, tmpcount);
+         return datacount;
+     }
+ }


[2/3] cassandra git commit: Reduce heap spent when receiving many SSTables

Posted by yu...@apache.org.
Reduce heap spent when receiving many SSTables

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10797


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e63000c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e63000c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e63000c

Branch: refs/heads/trunk
Commit: 0e63000c3fd0029e5b620a7923ea2ac54771e8e9
Parents: d5326d4
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Dec 18 20:33:21 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Sat Dec 19 08:35:12 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/streaming/StreamReceiveTask.java  |  19 +-
 .../io/sstable/SSTableRewriterTest.java         | 165 +--------------
 .../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++
 .../io/sstable/SSTableWriterTestBase.java       | 166 +++++++++++++++
 5 files changed, 374 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d0f1613..ff139c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.3
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
  * Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
  * Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
  * (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0230d14..92a14d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask
     private boolean done = false;
 
     //  holds references to SSTables received
-    protected Collection<SSTableMultiWriter> sstables;
+    protected Collection<SSTableReader> sstables;
 
     public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
     {
@@ -97,7 +97,10 @@ public class StreamReceiveTask extends StreamTask
 
         assert cfId.equals(sstable.getCfId());
 
-        sstables.add(sstable);
+        Collection<SSTableReader> finished = sstable.finish(true);
+        txn.update(finished, false);
+        sstables.addAll(finished);
+
         if (sstables.size() == totalFiles)
         {
             done = true;
@@ -134,7 +137,6 @@ public class StreamReceiveTask extends StreamTask
                 if (kscf == null)
                 {
                     // schema was dropped during streaming
-                    task.sstables.forEach(SSTableMultiWriter::abortOrDie);
                     task.sstables.clear();
                     task.txn.abort();
                     task.session.taskCompleted(task);
@@ -143,15 +145,7 @@ public class StreamReceiveTask extends StreamTask
                 cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
                 hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
 
-                List<SSTableReader> readers = new ArrayList<>();
-                for (SSTableMultiWriter writer : task.sstables)
-                {
-                    Collection<SSTableReader> newReaders = writer.finish(true);
-                    readers.addAll(newReaders);
-                    task.txn.update(newReaders, false);
-                }
-
-                task.sstables.clear();
+                Collection<SSTableReader> readers = task.sstables;
 
                 try (Refs<SSTableReader> refs = Refs.ref(readers))
                 {
@@ -245,7 +239,6 @@ public class StreamReceiveTask extends StreamTask
             return;
 
         done = true;
-        sstables.forEach(SSTableMultiWriter::abortOrDie);
         txn.abort();
         sstables.clear();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bfe7b08..008df06 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,48 +69,8 @@ import org.apache.cassandra.utils.UUIDGen;
 
 import static org.junit.Assert.*;
 
-public class SSTableRewriterTest extends SchemaLoader
+public class SSTableRewriterTest extends SSTableWriterTestBase
 {
-    private static final String KEYSPACE = "SSTableRewriterTest";
-    private static final String CF = "Standard1";
-
-    private static Config.DiskAccessMode standardMode;
-    private static Config.DiskAccessMode indexMode;
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        if (FBUtilities.isWindows())
-        {
-            standardMode = DatabaseDescriptor.getDiskAccessMode();
-            indexMode = DatabaseDescriptor.getIndexAccessMode();
-
-            DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
-            DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
-        }
-
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE, CF));
-    }
-
-    @AfterClass
-    public static void revertDiskAccess()
-    {
-        DatabaseDescriptor.setDiskAccessMode(standardMode);
-        DatabaseDescriptor.setIndexAccessMode(indexMode);
-    }
-
-    @After
-    public void truncateCF()
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
-        store.truncateBlocking();
-        LifecycleTransaction.waitForDeletions();
-    }
-
     @Test
     public void basicTest() throws InterruptedException
     {
@@ -239,56 +199,6 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testFileRemoval() throws InterruptedException
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        truncate(cfs);
-
-        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
-        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
-        try (SSTableWriter writer = getWriter(cfs, dir, txn))
-        {
-            for (int i = 0; i < 10000; i++)
-            {
-                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
-                for (int j = 0; j < 100; j++)
-                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
-                writer.append(builder.build().unfilteredIterator());
-            }
-
-            SSTableReader s = writer.setMaxDataAge(1000).openEarly();
-            assert s != null;
-            assertFileCounts(dir.list());
-            for (int i = 10000; i < 20000; i++)
-            {
-                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
-                for (int j = 0; j < 100; j++)
-                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
-                writer.append(builder.build().unfilteredIterator());
-            }
-            SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
-            assertTrue(s.last.compareTo(s2.last) < 0);
-            assertFileCounts(dir.list());
-            s.selfRef().release();
-            s2.selfRef().release();
-            // These checks don't work on Windows because the writer has the channel still
-            // open till .abort() is called (via the builder)
-            if (!FBUtilities.isWindows())
-            {
-                LifecycleTransaction.waitForDeletions();
-                assertFileCounts(dir.list());
-            }
-            writer.abort();
-            txn.abort();
-            LifecycleTransaction.waitForDeletions();
-            int datafiles = assertFileCounts(dir.list());
-            assertEquals(datafiles, 0);
-            validateCFS(cfs);
-        }
-    }
-
-    @Test
     public void testNumberOfFilesAndSizes() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
@@ -919,16 +829,6 @@ public class SSTableRewriterTest extends SchemaLoader
         }
     }
 
-    public static void truncate(ColumnFamilyStore cfs)
-    {
-        cfs.truncateBlocking();
-        LifecycleTransaction.waitForDeletions();
-        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
-        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
-        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
-        validateCFS(cfs);
-    }
-
     public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
         return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
@@ -959,67 +859,4 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         return result;
     }
-
-    public static void validateCFS(ColumnFamilyStore cfs)
-    {
-        Set<Integer> liveDescriptors = new HashSet<>();
-        long spaceUsed = 0;
-        for (SSTableReader sstable : cfs.getLiveSSTables())
-        {
-            assertFalse(sstable.isMarkedCompacted());
-            assertEquals(1, sstable.selfRef().globalCount());
-            liveDescriptors.add(sstable.descriptor.generation);
-            spaceUsed += sstable.bytesOnDisk();
-        }
-        for (File dir : cfs.getDirectories().getCFDirectories())
-        {
-            for (File f : dir.listFiles())
-            {
-                if (f.getName().contains("Data"))
-                {
-                    Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
-                    assertTrue(d.toString(), liveDescriptors.contains(d.generation));
-                }
-            }
-        }
-        assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
-        assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
-        assertTrue(cfs.getTracker().getCompacting().isEmpty());
-    }
-
-    public static int assertFileCounts(String [] files)
-    {
-        int tmplinkcount = 0;
-        int tmpcount = 0;
-        int datacount = 0;
-        for (String f : files)
-        {
-            if (f.endsWith("-CRC.db"))
-                continue;
-            if (f.contains("tmplink-"))
-                tmplinkcount++;
-            else if (f.contains("tmp-"))
-                tmpcount++;
-            else if (f.contains("Data"))
-                datacount++;
-        }
-        assertEquals(0, tmplinkcount);
-        assertEquals(0, tmpcount);
-        return datacount;
-    }
-
-    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
-    {
-        String filename = cfs.getSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
-    }
-
-    public static ByteBuffer random(int i, int size)
-    {
-        byte[] bytes = new byte[size + 4];
-        ThreadLocalRandom.current().nextBytes(bytes);
-        ByteBuffer r = ByteBuffer.wrap(bytes);
-        r.putInt(0, i);
-        return r;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
new file mode 100644
index 0000000..a73a164
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTest extends SSTableWriterTestBase
+{
+    @Test
+    public void testAbortTxnWithOpenEarlyShouldRemoveSSTable() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
+        try (SSTableWriter writer = getWriter(cfs, dir, txn))
+        {
+            for (int i = 0; i < 10000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+
+            SSTableReader s = writer.setMaxDataAge(1000).openEarly();
+            assert s != null;
+            assertFileCounts(dir.list());
+            for (int i = 10000; i < 20000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+            SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
+            assertTrue(s.last.compareTo(s2.last) < 0);
+            assertFileCounts(dir.list());
+            s.selfRef().release();
+            s2.selfRef().release();
+
+            int datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 1);
+
+            // These checks don't work on Windows because the writer has the channel still
+            // open till .abort() is called (via the builder)
+            if (!FBUtilities.isWindows())
+            {
+                LifecycleTransaction.waitForDeletions();
+                assertFileCounts(dir.list());
+            }
+            writer.abort();
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+            datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+    }
+
+
+    @Test
+    public void testAbortTxnWithClosedWriterShouldRemoveSSTable() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+        try (SSTableWriter writer = getWriter(cfs, dir, txn))
+        {
+            for (int i = 0; i < 10000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+
+            assertFileCounts(dir.list());
+            for (int i = 10000; i < 20000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer.append(builder.build().unfilteredIterator());
+            }
+            SSTableReader sstable = writer.finish(true);
+            int datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 1);
+
+            sstable.selfRef().release();
+            // These checks don't work on Windows because the writer has the channel still
+            // open till .abort() is called (via the builder)
+            if (!FBUtilities.isWindows())
+            {
+                LifecycleTransaction.waitForDeletions();
+                assertFileCounts(dir.list());
+            }
+
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+            datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+    }
+
+    @Test
+    public void testAbortTxnWithClosedAndOpenWriterShouldRemoveAllSSTables() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        truncate(cfs);
+
+        File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+        SSTableWriter writer1 = getWriter(cfs, dir, txn);
+        SSTableWriter writer2 = getWriter(cfs, dir, txn);
+        try
+        {
+            for (int i = 0; i < 10000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer1.append(builder.build().unfilteredIterator());
+            }
+
+            assertFileCounts(dir.list());
+            for (int i = 10000; i < 20000; i++)
+            {
+                UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+                for (int j = 0; j < 100; j++)
+                    builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+                writer2.append(builder.build().unfilteredIterator());
+            }
+            SSTableReader sstable = writer1.finish(true);
+            txn.update(sstable, false);
+
+            assertFileCounts(dir.list());
+
+            int datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 2);
+
+            // These checks don't work on Windows because the writer has the channel still
+            // open till .abort() is called (via the builder)
+            if (!FBUtilities.isWindows())
+            {
+                LifecycleTransaction.waitForDeletions();
+                assertFileCounts(dir.list());
+            }
+            txn.abort();
+            LifecycleTransaction.waitForDeletions();
+            datafiles = assertFileCounts(dir.list());
+            assertEquals(datafiles, 0);
+            validateCFS(cfs);
+        }
+        finally
+        {
+            writer1.close();
+            writer2.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
new file mode 100644
index 0000000..0af743d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTestBase extends SchemaLoader
+{
+
+    protected static final String KEYSPACE = "SSTableRewriterTest";
+    protected static final String CF = "Standard1";
+
+    private static Config.DiskAccessMode standardMode;
+    private static Config.DiskAccessMode indexMode;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        if (FBUtilities.isWindows())
+        {
+            standardMode = DatabaseDescriptor.getDiskAccessMode();
+            indexMode = DatabaseDescriptor.getIndexAccessMode();
+
+            DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
+            DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
+        }
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF));
+    }
+
+    @AfterClass
+    public static void revertDiskAccess()
+    {
+        DatabaseDescriptor.setDiskAccessMode(standardMode);
+        DatabaseDescriptor.setIndexAccessMode(indexMode);
+    }
+
+    @After
+    public void truncateCF()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.truncateBlocking();
+        LifecycleTransaction.waitForDeletions();
+    }
+
+    public static void truncate(ColumnFamilyStore cfs)
+    {
+        cfs.truncateBlocking();
+        LifecycleTransaction.waitForDeletions();
+        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+        validateCFS(cfs);
+    }
+
+    public static void validateCFS(ColumnFamilyStore cfs)
+    {
+        Set<Integer> liveDescriptors = new HashSet<>();
+        long spaceUsed = 0;
+        for (SSTableReader sstable : cfs.getLiveSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.selfRef().globalCount());
+            liveDescriptors.add(sstable.descriptor.generation);
+            spaceUsed += sstable.bytesOnDisk();
+        }
+        for (File dir : cfs.getDirectories().getCFDirectories())
+        {
+            for (File f : dir.listFiles())
+            {
+                if (f.getName().contains("Data"))
+                {
+                    Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
+                    assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+                }
+            }
+        }
+        assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
+    }
+
+    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+    {
+        String filename = cfs.getSSTablePath(directory);
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+    }
+
+    public static ByteBuffer random(int i, int size)
+    {
+        byte[] bytes = new byte[size + 4];
+        ThreadLocalRandom.current().nextBytes(bytes);
+        ByteBuffer r = ByteBuffer.wrap(bytes);
+        r.putInt(0, i);
+        return r;
+    }
+
+    public static int assertFileCounts(String [] files)
+    {
+        int tmplinkcount = 0;
+        int tmpcount = 0;
+        int datacount = 0;
+        for (String f : files)
+        {
+            if (f.endsWith("-CRC.db"))
+                continue;
+            if (f.contains("tmplink-"))
+                tmplinkcount++;
+            else if (f.contains("tmp-"))
+                tmpcount++;
+            else if (f.contains("Data"))
+                datacount++;
+        }
+        assertEquals(0, tmplinkcount);
+        assertEquals(0, tmpcount);
+        return datacount;
+    }
+}