You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/19 16:56:39 UTC
[1/3] cassandra git commit: Reduce heap spent when receiving many
SSTables
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 d5326d4aa -> 0e63000c3
refs/heads/trunk 7e4737716 -> c0fd119ce
Reduce heap spent when receiving many SSTables
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10797
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e63000c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e63000c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e63000c
Branch: refs/heads/cassandra-3.0
Commit: 0e63000c3fd0029e5b620a7923ea2ac54771e8e9
Parents: d5326d4
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Dec 18 20:33:21 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Sat Dec 19 08:35:12 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 19 +-
.../io/sstable/SSTableRewriterTest.java | 165 +--------------
.../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++
.../io/sstable/SSTableWriterTestBase.java | 166 +++++++++++++++
5 files changed, 374 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d0f1613..ff139c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.3
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
* Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
* Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
* (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0230d14..92a14d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask
private boolean done = false;
// holds references to SSTables received
- protected Collection<SSTableMultiWriter> sstables;
+ protected Collection<SSTableReader> sstables;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
@@ -97,7 +97,10 @@ public class StreamReceiveTask extends StreamTask
assert cfId.equals(sstable.getCfId());
- sstables.add(sstable);
+ Collection<SSTableReader> finished = sstable.finish(true);
+ txn.update(finished, false);
+ sstables.addAll(finished);
+
if (sstables.size() == totalFiles)
{
done = true;
@@ -134,7 +137,6 @@ public class StreamReceiveTask extends StreamTask
if (kscf == null)
{
// schema was dropped during streaming
- task.sstables.forEach(SSTableMultiWriter::abortOrDie);
task.sstables.clear();
task.txn.abort();
task.session.taskCompleted(task);
@@ -143,15 +145,7 @@ public class StreamReceiveTask extends StreamTask
cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableMultiWriter writer : task.sstables)
- {
- Collection<SSTableReader> newReaders = writer.finish(true);
- readers.addAll(newReaders);
- task.txn.update(newReaders, false);
- }
-
- task.sstables.clear();
+ Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
@@ -245,7 +239,6 @@ public class StreamReceiveTask extends StreamTask
return;
done = true;
- sstables.forEach(SSTableMultiWriter::abortOrDie);
txn.abort();
sstables.clear();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bfe7b08..008df06 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,48 +69,8 @@ import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.*;
-public class SSTableRewriterTest extends SchemaLoader
+public class SSTableRewriterTest extends SSTableWriterTestBase
{
- private static final String KEYSPACE = "SSTableRewriterTest";
- private static final String CF = "Standard1";
-
- private static Config.DiskAccessMode standardMode;
- private static Config.DiskAccessMode indexMode;
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- if (FBUtilities.isWindows())
- {
- standardMode = DatabaseDescriptor.getDiskAccessMode();
- indexMode = DatabaseDescriptor.getIndexAccessMode();
-
- DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
- DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
- }
-
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE, CF));
- }
-
- @AfterClass
- public static void revertDiskAccess()
- {
- DatabaseDescriptor.setDiskAccessMode(standardMode);
- DatabaseDescriptor.setIndexAccessMode(indexMode);
- }
-
- @After
- public void truncateCF()
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
- store.truncateBlocking();
- LifecycleTransaction.waitForDeletions();
- }
-
@Test
public void basicTest() throws InterruptedException
{
@@ -239,56 +199,6 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testFileRemoval() throws InterruptedException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
- truncate(cfs);
-
- File dir = cfs.getDirectories().getDirectoryForNewSSTables();
- LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
- try (SSTableWriter writer = getWriter(cfs, dir, txn))
- {
- for (int i = 0; i < 10000; i++)
- {
- UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
- for (int j = 0; j < 100; j++)
- builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
- writer.append(builder.build().unfilteredIterator());
- }
-
- SSTableReader s = writer.setMaxDataAge(1000).openEarly();
- assert s != null;
- assertFileCounts(dir.list());
- for (int i = 10000; i < 20000; i++)
- {
- UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
- for (int j = 0; j < 100; j++)
- builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
- writer.append(builder.build().unfilteredIterator());
- }
- SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
- assertTrue(s.last.compareTo(s2.last) < 0);
- assertFileCounts(dir.list());
- s.selfRef().release();
- s2.selfRef().release();
- // These checks don't work on Windows because the writer has the channel still
- // open till .abort() is called (via the builder)
- if (!FBUtilities.isWindows())
- {
- LifecycleTransaction.waitForDeletions();
- assertFileCounts(dir.list());
- }
- writer.abort();
- txn.abort();
- LifecycleTransaction.waitForDeletions();
- int datafiles = assertFileCounts(dir.list());
- assertEquals(datafiles, 0);
- validateCFS(cfs);
- }
- }
-
- @Test
public void testNumberOfFilesAndSizes() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
@@ -919,16 +829,6 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
- public static void truncate(ColumnFamilyStore cfs)
- {
- cfs.truncateBlocking();
- LifecycleTransaction.waitForDeletions();
- Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
- assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
- assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
- validateCFS(cfs);
- }
-
public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
@@ -959,67 +859,4 @@ public class SSTableRewriterTest extends SchemaLoader
}
return result;
}
-
- public static void validateCFS(ColumnFamilyStore cfs)
- {
- Set<Integer> liveDescriptors = new HashSet<>();
- long spaceUsed = 0;
- for (SSTableReader sstable : cfs.getLiveSSTables())
- {
- assertFalse(sstable.isMarkedCompacted());
- assertEquals(1, sstable.selfRef().globalCount());
- liveDescriptors.add(sstable.descriptor.generation);
- spaceUsed += sstable.bytesOnDisk();
- }
- for (File dir : cfs.getDirectories().getCFDirectories())
- {
- for (File f : dir.listFiles())
- {
- if (f.getName().contains("Data"))
- {
- Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
- assertTrue(d.toString(), liveDescriptors.contains(d.generation));
- }
- }
- }
- assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
- assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
- assertTrue(cfs.getTracker().getCompacting().isEmpty());
- }
-
- public static int assertFileCounts(String [] files)
- {
- int tmplinkcount = 0;
- int tmpcount = 0;
- int datacount = 0;
- for (String f : files)
- {
- if (f.endsWith("-CRC.db"))
- continue;
- if (f.contains("tmplink-"))
- tmplinkcount++;
- else if (f.contains("tmp-"))
- tmpcount++;
- else if (f.contains("Data"))
- datacount++;
- }
- assertEquals(0, tmplinkcount);
- assertEquals(0, tmpcount);
- return datacount;
- }
-
- public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
- {
- String filename = cfs.getSSTablePath(directory);
- return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
- }
-
- public static ByteBuffer random(int i, int size)
- {
- byte[] bytes = new byte[size + 4];
- ThreadLocalRandom.current().nextBytes(bytes);
- ByteBuffer r = ByteBuffer.wrap(bytes);
- r.putInt(0, i);
- return r;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
new file mode 100644
index 0000000..a73a164
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTest extends SSTableWriterTestBase
+{
+ @Test
+ public void testAbortTxnWithOpenEarlyShouldRemoveSSTable() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
+ try (SSTableWriter writer = getWriter(cfs, dir, txn))
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+
+ SSTableReader s = writer.setMaxDataAge(1000).openEarly();
+ assert s != null;
+ assertFileCounts(dir.list());
+ for (int i = 10000; i < 20000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+ SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
+ assertTrue(s.last.compareTo(s2.last) < 0);
+ assertFileCounts(dir.list());
+ s.selfRef().release();
+ s2.selfRef().release();
+
+ int datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 1);
+
+ // These checks don't work on Windows because the writer has the channel still
+ // open till .abort() is called (via the builder)
+ if (!FBUtilities.isWindows())
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertFileCounts(dir.list());
+ }
+ writer.abort();
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ }
+
+
+ @Test
+ public void testAbortTxnWithClosedWriterShouldRemoveSSTable() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+ try (SSTableWriter writer = getWriter(cfs, dir, txn))
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+
+ assertFileCounts(dir.list());
+ for (int i = 10000; i < 20000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+ SSTableReader sstable = writer.finish(true);
+ int datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 1);
+
+ sstable.selfRef().release();
+ // These checks don't work on Windows because the writer has the channel still
+ // open till .abort() is called (via the builder)
+ if (!FBUtilities.isWindows())
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertFileCounts(dir.list());
+ }
+
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ }
+
+ @Test
+ public void testAbortTxnWithClosedAndOpenWriterShouldRemoveAllSSTables() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+ SSTableWriter writer1 = getWriter(cfs, dir, txn);
+ SSTableWriter writer2 = getWriter(cfs, dir, txn);
+ try
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer1.append(builder.build().unfilteredIterator());
+ }
+
+ assertFileCounts(dir.list());
+ for (int i = 10000; i < 20000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer2.append(builder.build().unfilteredIterator());
+ }
+ SSTableReader sstable = writer1.finish(true);
+ txn.update(sstable, false);
+
+ assertFileCounts(dir.list());
+
+ int datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 2);
+
+ // These checks don't work on Windows because the writer has the channel still
+ // open till .abort() is called (via the builder)
+ if (!FBUtilities.isWindows())
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertFileCounts(dir.list());
+ }
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ finally
+ {
+ writer1.close();
+ writer2.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
new file mode 100644
index 0000000..0af743d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTestBase extends SchemaLoader
+{
+
+ protected static final String KEYSPACE = "SSTableRewriterTest";
+ protected static final String CF = "Standard1";
+
+ private static Config.DiskAccessMode standardMode;
+ private static Config.DiskAccessMode indexMode;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ if (FBUtilities.isWindows())
+ {
+ standardMode = DatabaseDescriptor.getDiskAccessMode();
+ indexMode = DatabaseDescriptor.getIndexAccessMode();
+
+ DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
+ }
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF));
+ }
+
+ @AfterClass
+ public static void revertDiskAccess()
+ {
+ DatabaseDescriptor.setDiskAccessMode(standardMode);
+ DatabaseDescriptor.setIndexAccessMode(indexMode);
+ }
+
+ @After
+ public void truncateCF()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ public static void truncate(ColumnFamilyStore cfs)
+ {
+ cfs.truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+ assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+ validateCFS(cfs);
+ }
+
+ public static void validateCFS(ColumnFamilyStore cfs)
+ {
+ Set<Integer> liveDescriptors = new HashSet<>();
+ long spaceUsed = 0;
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.selfRef().globalCount());
+ liveDescriptors.add(sstable.descriptor.generation);
+ spaceUsed += sstable.bytesOnDisk();
+ }
+ for (File dir : cfs.getDirectories().getCFDirectories())
+ {
+ for (File f : dir.listFiles())
+ {
+ if (f.getName().contains("Data"))
+ {
+ Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
+ assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+ }
+ }
+ }
+ assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+ assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ }
+
+ public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+ {
+ String filename = cfs.getSSTablePath(directory);
+ return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+ }
+
+ public static ByteBuffer random(int i, int size)
+ {
+ byte[] bytes = new byte[size + 4];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ ByteBuffer r = ByteBuffer.wrap(bytes);
+ r.putInt(0, i);
+ return r;
+ }
+
+ public static int assertFileCounts(String [] files)
+ {
+ int tmplinkcount = 0;
+ int tmpcount = 0;
+ int datacount = 0;
+ for (String f : files)
+ {
+ if (f.endsWith("-CRC.db"))
+ continue;
+ if (f.contains("tmplink-"))
+ tmplinkcount++;
+ else if (f.contains("tmp-"))
+ tmpcount++;
+ else if (f.contains("Data"))
+ datacount++;
+ }
+ assertEquals(0, tmplinkcount);
+ assertEquals(0, tmpcount);
+ return datacount;
+ }
+}
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c0fd119c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c0fd119c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c0fd119c
Branch: refs/heads/trunk
Commit: c0fd119cea3240db3435575b1345d0bcefec0dc4
Parents: 7e47377 0e63000
Author: Yuki Morishita <yu...@apache.org>
Authored: Sat Dec 19 09:56:28 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Sat Dec 19 09:56:28 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 19 +-
.../io/sstable/SSTableRewriterTest.java | 165 +--------------
.../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++
.../io/sstable/SSTableWriterTestBase.java | 166 +++++++++++++++
5 files changed, 374 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0fd119c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bf2ca61,ff139c4..7bbfec6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,5 +1,28 @@@
-3.0.3
+3.2
+ * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
+ * Fix EstimatedHistogram creation in nodetool tablehistograms (CASSANDRA-10859)
+ * Establish bootstrap stream sessions sequentially (CASSANDRA-6992)
+ * Sort compactionhistory output by timestamp (CASSANDRA-10464)
+ * More efficient BTree removal (CASSANDRA-9991)
+ * Make tablehistograms accept the same syntax as tablestats (CASSANDRA-10149)
+ * Group pending compactions based on table (CASSANDRA-10718)
+ * Add compressor name in sstablemetadata output (CASSANDRA-9879)
+ * Fix type casting for counter columns (CASSANDRA-10824)
+ * Prevent running Cassandra as root (CASSANDRA-8142)
+ * bound maximum in-flight commit log replay mutation bytes to 64 megabytes (CASSANDRA-8639)
+ * Normalize all scripts (CASSANDRA-10679)
+ * Make compression ratio much more accurate (CASSANDRA-10225)
+ * Optimize building of Clustering object when only one is created (CASSANDRA-10409)
+ * Make index building pluggable (CASSANDRA-10681)
+ * Add sstable flush observer (CASSANDRA-10678)
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
+ * Improve performance of the folderSize function (CASSANDRA-10677)
+ * Add support for type casting in selection clause (CASSANDRA-10310)
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+Merged from 3.0:
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
* Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
* Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
* (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c0fd119c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index 0000000,0af743d..70f154d
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@@ -1,0 -1,166 +1,166 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.cassandra.io.sstable;
+
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.ThreadLocalRandom;
+ import java.util.concurrent.TimeUnit;
+
+ import com.google.common.util.concurrent.Uninterruptibles;
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SerializationHeader;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.rows.EncodingStats;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.utils.FBUtilities;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+
+ public class SSTableWriterTestBase extends SchemaLoader
+ {
+
+ protected static final String KEYSPACE = "SSTableRewriterTest";
+ protected static final String CF = "Standard1";
+
+ private static Config.DiskAccessMode standardMode;
+ private static Config.DiskAccessMode indexMode;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ if (FBUtilities.isWindows())
+ {
+ standardMode = DatabaseDescriptor.getDiskAccessMode();
+ indexMode = DatabaseDescriptor.getIndexAccessMode();
+
+ DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
+ }
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF));
+ }
+
+ @AfterClass
+ public static void revertDiskAccess()
+ {
+ DatabaseDescriptor.setDiskAccessMode(standardMode);
+ DatabaseDescriptor.setIndexAccessMode(indexMode);
+ }
+
+ @After
+ public void truncateCF()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ public static void truncate(ColumnFamilyStore cfs)
+ {
+ cfs.truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+ assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+ validateCFS(cfs);
+ }
+
+ public static void validateCFS(ColumnFamilyStore cfs)
+ {
+ Set<Integer> liveDescriptors = new HashSet<>();
+ long spaceUsed = 0;
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.selfRef().globalCount());
+ liveDescriptors.add(sstable.descriptor.generation);
+ spaceUsed += sstable.bytesOnDisk();
+ }
+ for (File dir : cfs.getDirectories().getCFDirectories())
+ {
+ for (File f : dir.listFiles())
+ {
+ if (f.getName().contains("Data"))
+ {
+ Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
+ assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+ }
+ }
+ }
+ assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+ assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ }
+
+ public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+ {
+ String filename = cfs.getSSTablePath(directory);
- return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
++ return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
+ }
+
+ public static ByteBuffer random(int i, int size)
+ {
+ byte[] bytes = new byte[size + 4];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ ByteBuffer r = ByteBuffer.wrap(bytes);
+ r.putInt(0, i);
+ return r;
+ }
+
+ public static int assertFileCounts(String [] files)
+ {
+ int tmplinkcount = 0;
+ int tmpcount = 0;
+ int datacount = 0;
+ for (String f : files)
+ {
+ if (f.endsWith("-CRC.db"))
+ continue;
+ if (f.contains("tmplink-"))
+ tmplinkcount++;
+ else if (f.contains("tmp-"))
+ tmpcount++;
+ else if (f.contains("Data"))
+ datacount++;
+ }
+ assertEquals(0, tmplinkcount);
+ assertEquals(0, tmpcount);
+ return datacount;
+ }
+ }
[2/3] cassandra git commit: Reduce heap spent when receiving many
SSTables
Posted by yu...@apache.org.
Reduce heap spent when receiving many SSTables
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10797
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e63000c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e63000c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e63000c
Branch: refs/heads/trunk
Commit: 0e63000c3fd0029e5b620a7923ea2ac54771e8e9
Parents: d5326d4
Author: Paulo Motta <pa...@gmail.com>
Authored: Fri Dec 18 20:33:21 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Sat Dec 19 08:35:12 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 19 +-
.../io/sstable/SSTableRewriterTest.java | 165 +--------------
.../cassandra/io/sstable/SSTableWriterTest.java | 200 +++++++++++++++++++
.../io/sstable/SSTableWriterTestBase.java | 166 +++++++++++++++
5 files changed, 374 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d0f1613..ff139c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.3
+ * Reduce heap spent when receiving many SSTables (CASSANDRA-10797)
* Add back support for 3rd party auth providers to bulk loader (CASSANDRA-10873)
* Eliminate the dependency on jgrapht for UDT resolution (CASSANDRA-10653)
* (Hadoop) Close Clusters and Sessions in Hadoop Input/Output classes (CASSANDRA-1837)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 0230d14..92a14d1 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -72,7 +72,7 @@ public class StreamReceiveTask extends StreamTask
private boolean done = false;
// holds references to SSTables received
- protected Collection<SSTableMultiWriter> sstables;
+ protected Collection<SSTableReader> sstables;
public StreamReceiveTask(StreamSession session, UUID cfId, int totalFiles, long totalSize)
{
@@ -97,7 +97,10 @@ public class StreamReceiveTask extends StreamTask
assert cfId.equals(sstable.getCfId());
- sstables.add(sstable);
+ Collection<SSTableReader> finished = sstable.finish(true);
+ txn.update(finished, false);
+ sstables.addAll(finished);
+
if (sstables.size() == totalFiles)
{
done = true;
@@ -134,7 +137,6 @@ public class StreamReceiveTask extends StreamTask
if (kscf == null)
{
// schema was dropped during streaming
- task.sstables.forEach(SSTableMultiWriter::abortOrDie);
task.sstables.clear();
task.txn.abort();
task.session.taskCompleted(task);
@@ -143,15 +145,7 @@ public class StreamReceiveTask extends StreamTask
cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
hasViews = !Iterables.isEmpty(View.findAll(kscf.left, kscf.right));
- List<SSTableReader> readers = new ArrayList<>();
- for (SSTableMultiWriter writer : task.sstables)
- {
- Collection<SSTableReader> newReaders = writer.finish(true);
- readers.addAll(newReaders);
- task.txn.update(newReaders, false);
- }
-
- task.sstables.clear();
+ Collection<SSTableReader> readers = task.sstables;
try (Refs<SSTableReader> refs = Refs.ref(readers))
{
@@ -245,7 +239,6 @@ public class StreamReceiveTask extends StreamTask
return;
done = true;
- sstables.forEach(SSTableMultiWriter::abortOrDie);
txn.abort();
sstables.clear();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bfe7b08..008df06 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -69,48 +69,8 @@ import org.apache.cassandra.utils.UUIDGen;
import static org.junit.Assert.*;
-public class SSTableRewriterTest extends SchemaLoader
+public class SSTableRewriterTest extends SSTableWriterTestBase
{
- private static final String KEYSPACE = "SSTableRewriterTest";
- private static final String CF = "Standard1";
-
- private static Config.DiskAccessMode standardMode;
- private static Config.DiskAccessMode indexMode;
-
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
- {
- if (FBUtilities.isWindows())
- {
- standardMode = DatabaseDescriptor.getDiskAccessMode();
- indexMode = DatabaseDescriptor.getIndexAccessMode();
-
- DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
- DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
- }
-
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE, CF));
- }
-
- @AfterClass
- public static void revertDiskAccess()
- {
- DatabaseDescriptor.setDiskAccessMode(standardMode);
- DatabaseDescriptor.setIndexAccessMode(indexMode);
- }
-
- @After
- public void truncateCF()
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE);
- ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
- store.truncateBlocking();
- LifecycleTransaction.waitForDeletions();
- }
-
@Test
public void basicTest() throws InterruptedException
{
@@ -239,56 +199,6 @@ public class SSTableRewriterTest extends SchemaLoader
}
@Test
- public void testFileRemoval() throws InterruptedException
- {
- Keyspace keyspace = Keyspace.open(KEYSPACE);
- ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
- truncate(cfs);
-
- File dir = cfs.getDirectories().getDirectoryForNewSSTables();
- LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
- try (SSTableWriter writer = getWriter(cfs, dir, txn))
- {
- for (int i = 0; i < 10000; i++)
- {
- UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
- for (int j = 0; j < 100; j++)
- builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
- writer.append(builder.build().unfilteredIterator());
- }
-
- SSTableReader s = writer.setMaxDataAge(1000).openEarly();
- assert s != null;
- assertFileCounts(dir.list());
- for (int i = 10000; i < 20000; i++)
- {
- UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
- for (int j = 0; j < 100; j++)
- builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
- writer.append(builder.build().unfilteredIterator());
- }
- SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
- assertTrue(s.last.compareTo(s2.last) < 0);
- assertFileCounts(dir.list());
- s.selfRef().release();
- s2.selfRef().release();
- // These checks don't work on Windows because the writer has the channel still
- // open till .abort() is called (via the builder)
- if (!FBUtilities.isWindows())
- {
- LifecycleTransaction.waitForDeletions();
- assertFileCounts(dir.list());
- }
- writer.abort();
- txn.abort();
- LifecycleTransaction.waitForDeletions();
- int datafiles = assertFileCounts(dir.list());
- assertEquals(datafiles, 0);
- validateCFS(cfs);
- }
- }
-
- @Test
public void testNumberOfFilesAndSizes() throws Exception
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
@@ -919,16 +829,6 @@ public class SSTableRewriterTest extends SchemaLoader
}
}
- public static void truncate(ColumnFamilyStore cfs)
- {
- cfs.truncateBlocking();
- LifecycleTransaction.waitForDeletions();
- Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
- assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
- assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
- validateCFS(cfs);
- }
-
public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
{
return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
@@ -959,67 +859,4 @@ public class SSTableRewriterTest extends SchemaLoader
}
return result;
}
-
- public static void validateCFS(ColumnFamilyStore cfs)
- {
- Set<Integer> liveDescriptors = new HashSet<>();
- long spaceUsed = 0;
- for (SSTableReader sstable : cfs.getLiveSSTables())
- {
- assertFalse(sstable.isMarkedCompacted());
- assertEquals(1, sstable.selfRef().globalCount());
- liveDescriptors.add(sstable.descriptor.generation);
- spaceUsed += sstable.bytesOnDisk();
- }
- for (File dir : cfs.getDirectories().getCFDirectories())
- {
- for (File f : dir.listFiles())
- {
- if (f.getName().contains("Data"))
- {
- Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
- assertTrue(d.toString(), liveDescriptors.contains(d.generation));
- }
- }
- }
- assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
- assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
- assertTrue(cfs.getTracker().getCompacting().isEmpty());
- }
-
- public static int assertFileCounts(String [] files)
- {
- int tmplinkcount = 0;
- int tmpcount = 0;
- int datacount = 0;
- for (String f : files)
- {
- if (f.endsWith("-CRC.db"))
- continue;
- if (f.contains("tmplink-"))
- tmplinkcount++;
- else if (f.contains("tmp-"))
- tmpcount++;
- else if (f.contains("Data"))
- datacount++;
- }
- assertEquals(0, tmplinkcount);
- assertEquals(0, tmpcount);
- return datacount;
- }
-
- public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
- {
- String filename = cfs.getSSTablePath(directory);
- return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
- }
-
- public static ByteBuffer random(int i, int size)
- {
- byte[] bytes = new byte[size + 4];
- ThreadLocalRandom.current().nextBytes(bytes);
- ByteBuffer r = ByteBuffer.wrap(bytes);
- r.putInt(0, i);
- return r;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
new file mode 100644
index 0000000..a73a164
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTest extends SSTableWriterTestBase
+{
+ @Test
+ public void testAbortTxnWithOpenEarlyShouldRemoveSSTable() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
+ try (SSTableWriter writer = getWriter(cfs, dir, txn))
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+
+ SSTableReader s = writer.setMaxDataAge(1000).openEarly();
+ assert s != null;
+ assertFileCounts(dir.list());
+ for (int i = 10000; i < 20000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+ SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
+ assertTrue(s.last.compareTo(s2.last) < 0);
+ assertFileCounts(dir.list());
+ s.selfRef().release();
+ s2.selfRef().release();
+
+ int datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 1);
+
+ // These checks don't work on Windows because the writer has the channel still
+ // open till .abort() is called (via the builder)
+ if (!FBUtilities.isWindows())
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertFileCounts(dir.list());
+ }
+ writer.abort();
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ }
+
+
+ @Test
+ public void testAbortTxnWithClosedWriterShouldRemoveSSTable() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+ try (SSTableWriter writer = getWriter(cfs, dir, txn))
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+
+ assertFileCounts(dir.list());
+ for (int i = 10000; i < 20000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer.append(builder.build().unfilteredIterator());
+ }
+ SSTableReader sstable = writer.finish(true);
+ int datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 1);
+
+ sstable.selfRef().release();
+ // These checks don't work on Windows because the writer has the channel still
+ // open till .abort() is called (via the builder)
+ if (!FBUtilities.isWindows())
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertFileCounts(dir.list());
+ }
+
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ }
+
+ @Test
+ public void testAbortTxnWithClosedAndOpenWriterShouldRemoveAllSSTables() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+ SSTableWriter writer1 = getWriter(cfs, dir, txn);
+ SSTableWriter writer2 = getWriter(cfs, dir, txn);
+ try
+ {
+ for (int i = 0; i < 10000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer1.append(builder.build().unfilteredIterator());
+ }
+
+ assertFileCounts(dir.list());
+ for (int i = 10000; i < 20000; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
+ for (int j = 0; j < 100; j++)
+ builder.newRow("" + j).add("val", ByteBuffer.allocate(1000));
+ writer2.append(builder.build().unfilteredIterator());
+ }
+ SSTableReader sstable = writer1.finish(true);
+ txn.update(sstable, false);
+
+ assertFileCounts(dir.list());
+
+ int datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 2);
+
+ // These checks don't work on Windows because the writer has the channel still
+ // open till .abort() is called (via the builder)
+ if (!FBUtilities.isWindows())
+ {
+ LifecycleTransaction.waitForDeletions();
+ assertFileCounts(dir.list());
+ }
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ datafiles = assertFileCounts(dir.list());
+ assertEquals(datafiles, 0);
+ validateCFS(cfs);
+ }
+ finally
+ {
+ writer1.close();
+ writer2.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e63000c/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
new file mode 100644
index 0000000..0af743d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableWriterTestBase extends SchemaLoader
+{
+
+ protected static final String KEYSPACE = "SSTableRewriterTest";
+ protected static final String CF = "Standard1";
+
+ private static Config.DiskAccessMode standardMode;
+ private static Config.DiskAccessMode indexMode;
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ if (FBUtilities.isWindows())
+ {
+ standardMode = DatabaseDescriptor.getDiskAccessMode();
+ indexMode = DatabaseDescriptor.getIndexAccessMode();
+
+ DatabaseDescriptor.setDiskAccessMode(Config.DiskAccessMode.standard);
+ DatabaseDescriptor.setIndexAccessMode(Config.DiskAccessMode.standard);
+ }
+
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE, CF));
+ }
+
+ @AfterClass
+ public static void revertDiskAccess()
+ {
+ DatabaseDescriptor.setDiskAccessMode(standardMode);
+ DatabaseDescriptor.setIndexAccessMode(indexMode);
+ }
+
+ @After
+ public void truncateCF()
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+ store.truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ public static void truncate(ColumnFamilyStore cfs)
+ {
+ cfs.truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+ assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+ assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+ validateCFS(cfs);
+ }
+
+ public static void validateCFS(ColumnFamilyStore cfs)
+ {
+ Set<Integer> liveDescriptors = new HashSet<>();
+ long spaceUsed = 0;
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ assertFalse(sstable.isMarkedCompacted());
+ assertEquals(1, sstable.selfRef().globalCount());
+ liveDescriptors.add(sstable.descriptor.generation);
+ spaceUsed += sstable.bytesOnDisk();
+ }
+ for (File dir : cfs.getDirectories().getCFDirectories())
+ {
+ for (File f : dir.listFiles())
+ {
+ if (f.getName().contains("Data"))
+ {
+ Descriptor d = Descriptor.fromFilename(f.getAbsolutePath());
+ assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+ }
+ }
+ }
+ assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+ assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+ assertTrue(cfs.getTracker().getCompacting().isEmpty());
+ }
+
+ public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
+ {
+ String filename = cfs.getSSTablePath(directory);
+ return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+ }
+
+ public static ByteBuffer random(int i, int size)
+ {
+ byte[] bytes = new byte[size + 4];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ ByteBuffer r = ByteBuffer.wrap(bytes);
+ r.putInt(0, i);
+ return r;
+ }
+
+ public static int assertFileCounts(String [] files)
+ {
+ int tmplinkcount = 0;
+ int tmpcount = 0;
+ int datacount = 0;
+ for (String f : files)
+ {
+ if (f.endsWith("-CRC.db"))
+ continue;
+ if (f.contains("tmplink-"))
+ tmplinkcount++;
+ else if (f.contains("tmp-"))
+ tmpcount++;
+ else if (f.contains("Data"))
+ datacount++;
+ }
+ assertEquals(0, tmplinkcount);
+ assertEquals(0, tmpcount);
+ return datacount;
+ }
+}