You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/10 14:52:02 UTC
cassandra git commit: Remove tmplink files for offline compactions
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 d69728f8a -> 29259cb22
Remove tmplink files for offline compactions
Patch by marcuse; reviewed by jmckenzie for CASSANDRA-8321
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29259cb2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29259cb2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29259cb2
Branch: refs/heads/cassandra-2.1
Commit: 29259cb22c2ba02d5c2beba6c6512173f8b5b3f9
Parents: d69728f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 25 11:12:20 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 10 14:46:44 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/io/sstable/SSTableRewriter.java | 31 +++++--
.../io/sstable/SSTableRewriterTest.java | 91 +++++++++++---------
3 files changed, 79 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3545afc..2e74a15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.3
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
* Reduce maxHintsInProgress (CASSANDRA-8415)
* BTree updates may call provided update function twice (CASSANDRA-8018)
* Release sstable references after anticompaction (CASSANDRA-8386)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index d187e9d..f9d2fe4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -190,9 +190,15 @@ public class SSTableRewriter
for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
{
- // we should close the bloom filter if we have not opened an sstable reader from this
- // writer (it will get closed when we release the sstable reference below):
+ // we should close the bloom filter if we have not opened an sstable reader from this
+ // writer (it will get closed when we release the sstable reference below):
w.left.abort(w.right == null);
+ if (isOffline && w.right != null)
+ {
+ // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here:
+ w.right.markObsolete();
+ w.right.releaseReference();
+ }
}
// also remove already completed SSTables
@@ -344,7 +350,15 @@ public class SSTableRewriter
finished.add(newReader);
if (w.right != null)
+ {
w.right.sharesBfWith(newReader);
+ if (isOffline)
+ {
+ // remove the tmplink files if we are offline - no one is using them
+ w.right.markObsolete();
+ w.right.releaseReference();
+ }
+ }
// w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
toReplace.add(Pair.create(w.right, newReader));
}
@@ -356,11 +370,10 @@ public class SSTableRewriter
it.remove();
}
- for (Pair<SSTableReader, SSTableReader> replace : toReplace)
- replaceEarlyOpenedFile(replace.left, replace.right);
-
if (!isOffline)
{
+ for (Pair<SSTableReader, SSTableReader> replace : toReplace)
+ replaceEarlyOpenedFile(replace.left, replace.right);
dataTracker.unmarkCompacting(finished);
}
return finished;
@@ -382,8 +395,16 @@ public class SSTableRewriter
{
SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
finished.add(newReader);
+
if (w.right != null)
+ {
w.right.sharesBfWith(newReader);
+ if (isOffline)
+ {
+ w.right.markObsolete();
+ w.right.releaseReference();
+ }
+ }
// w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
toReplace.add(Pair.create(w.right, newReader));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 0a76b66..c0a017e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.compaction.CompactionController;
import org.apache.cassandra.db.compaction.ICompactionScanner;
import org.apache.cassandra.db.compaction.LazilyCompactedRow;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.SSTableSplitter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.metrics.StorageMetrics;
import org.apache.cassandra.service.StorageService;
@@ -457,61 +458,59 @@ public class SSTableRewriterTest extends SchemaLoader
validateCFS(cfs);
}
@Test
- public void testAbort() throws Exception
+ public void testSSTableSplit() throws InterruptedException
{
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
SSTableReader s = writeFile(cfs, 1000);
- cfs.addSSTable(s);
- Set<SSTableReader> compacting = Sets.newHashSet(s);
- SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
- SSTableWriter w = getWriter(cfs, s.descriptor.directory);
- rewriter.switchWriter(w);
- try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
- CompactionController controller = new CompactionController(cfs, compacting, 0))
+ cfs.getDataTracker().markCompacting(Arrays.asList(s));
+ SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10);
+ splitter.split();
+ Thread.sleep(1000);
+ assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ for (File f : s.descriptor.directory.listFiles())
{
- while (scanner.hasNext())
- {
- rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
- if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
- {
- rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
- }
- }
- try
- {
- rewriter.finishAndThrow(false);
- }
- catch (Throwable t)
- {
- rewriter.abort();
- }
+ // we need to clear out the data dir, otherwise tests running after this breaks
+ f.delete();
}
- Thread.sleep(1000);
- int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
- assertEquals(filecount, 1);
- assertEquals(1, cfs.getSSTables().size());
- validateCFS(cfs);
- cfs.truncateBlocking();
- Thread.sleep(1000);
- filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
- assertEquals(0, filecount);
+ }
+ @Test
+ public void testOfflineAbort() throws Exception
+ {
+ testAbortHelper(true, true);
+ }
+ @Test
+ public void testOfflineAbort2() throws Exception
+ {
+ testAbortHelper(false, true);
+ }
+
+ @Test
+ public void testAbort() throws Exception
+ {
+ testAbortHelper(false, false);
}
@Test
public void testAbort2() throws Exception
{
+ testAbortHelper(true, false);
+ }
+
+ private void testAbortHelper(boolean earlyException, boolean offline) throws Exception
+ {
Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
cfs.truncateBlocking();
SSTableReader s = writeFile(cfs, 1000);
- cfs.addSSTable(s);
+ if (!offline)
+ cfs.addSSTable(s);
Set<SSTableReader> compacting = Sets.newHashSet(s);
SSTableRewriter.overrideOpenInterval(10000000);
- SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+ SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
SSTableWriter w = getWriter(cfs, s.descriptor.directory);
rewriter.switchWriter(w);
try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
@@ -527,7 +526,7 @@ public class SSTableRewriterTest extends SchemaLoader
}
try
{
- rewriter.finishAndThrow(true);
+ rewriter.finishAndThrow(earlyException);
}
catch (Throwable t)
{
@@ -537,11 +536,25 @@ public class SSTableRewriterTest extends SchemaLoader
Thread.sleep(1000);
int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
assertEquals(filecount, 1);
- assertEquals(1, cfs.getSSTables().size());
- validateCFS(cfs);
+ if (!offline)
+ {
+ assertEquals(1, cfs.getSSTables().size());
+ validateCFS(cfs);
+ }
cfs.truncateBlocking();
Thread.sleep(1000);
filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ if (offline)
+ {
+ // the file is not added to the CFS, therefor not truncated away above
+ assertEquals(1, filecount);
+ for (File f : s.descriptor.directory.listFiles())
+ {
+ f.delete();
+ }
+ filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+ }
+
assertEquals(0, filecount);
}