You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/12/10 14:52:02 UTC

cassandra git commit: Remove tmplink files for offline compactions

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 d69728f8a -> 29259cb22


Remove tmplink files for offline compactions

Patch by marcuse; reviewed by jmckenzie for CASSANDRA-8321


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/29259cb2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/29259cb2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/29259cb2

Branch: refs/heads/cassandra-2.1
Commit: 29259cb22c2ba02d5c2beba6c6512173f8b5b3f9
Parents: d69728f
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 25 11:12:20 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 10 14:46:44 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/SSTableRewriter.java   | 31 +++++--
 .../io/sstable/SSTableRewriterTest.java         | 91 +++++++++++---------
 3 files changed, 79 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3545afc..2e74a15 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Remove tmplink files for offline compactions (CASSANDRA-8321)
  * Reduce maxHintsInProgress (CASSANDRA-8415)
  * BTree updates may call provided update function twice (CASSANDRA-8018)
  * Release sstable references after anticompaction (CASSANDRA-8386)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index d187e9d..f9d2fe4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -190,9 +190,15 @@ public class SSTableRewriter
 
         for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
         {
-        // we should close the bloom filter if we have not opened an sstable reader from this
-        // writer (it will get closed when we release the sstable reference below):
+            // we should close the bloom filter if we have not opened an sstable reader from this
+            // writer (it will get closed when we release the sstable reference below):
             w.left.abort(w.right == null);
+            if (isOffline && w.right != null)
+            {
+                // the pairs get removed from finishedWriters when they are closedAndOpened in finish(), the ones left need to be removed here:
+                w.right.markObsolete();
+                w.right.releaseReference();
+            }
         }
 
         // also remove already completed SSTables
@@ -344,7 +350,15 @@ public class SSTableRewriter
                 finished.add(newReader);
 
                 if (w.right != null)
+                {
                     w.right.sharesBfWith(newReader);
+                    if (isOffline)
+                    {
+                        // remove the tmplink files if we are offline - no one is using them
+                        w.right.markObsolete();
+                        w.right.releaseReference();
+                    }
+                }
                 // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
                 toReplace.add(Pair.create(w.right, newReader));
             }
@@ -356,11 +370,10 @@ public class SSTableRewriter
             it.remove();
         }
 
-        for (Pair<SSTableReader, SSTableReader> replace : toReplace)
-            replaceEarlyOpenedFile(replace.left, replace.right);
-
         if (!isOffline)
         {
+            for (Pair<SSTableReader, SSTableReader> replace : toReplace)
+                replaceEarlyOpenedFile(replace.left, replace.right);
             dataTracker.unmarkCompacting(finished);
         }
         return finished;
@@ -382,8 +395,16 @@ public class SSTableRewriter
             {
                 SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
                 finished.add(newReader);
+
                 if (w.right != null)
+                {
                     w.right.sharesBfWith(newReader);
+                    if (isOffline)
+                    {
+                        w.right.markObsolete();
+                        w.right.releaseReference();
+                    }
+                }
                 // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
                 toReplace.add(Pair.create(w.right, newReader));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/29259cb2/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 0a76b66..c0a017e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.db.compaction.ICompactionScanner;
 import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.compaction.SSTableSplitter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
@@ -457,61 +458,59 @@ public class SSTableRewriterTest extends SchemaLoader
         validateCFS(cfs);
     }
     @Test
-    public void testAbort() throws Exception
+    public void testSSTableSplit() throws InterruptedException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
         cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
         SSTableReader s = writeFile(cfs, 1000);
-        cfs.addSSTable(s);
-        Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
-        rewriter.switchWriter(w);
-        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting, 0))
+        cfs.getDataTracker().markCompacting(Arrays.asList(s));
+        SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10);
+        splitter.split();
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        for (File f : s.descriptor.directory.listFiles())
         {
-            while (scanner.hasNext())
-            {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-                {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                }
-            }
-            try
-            {
-                rewriter.finishAndThrow(false);
-            }
-            catch (Throwable t)
-            {
-                rewriter.abort();
-            }
+            // we need to clear out the data dir, otherwise tests running after this breaks
+            f.delete();
         }
-        Thread.sleep(1000);
-        int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        assertEquals(filecount, 1);
-        assertEquals(1, cfs.getSSTables().size());
-        validateCFS(cfs);
-        cfs.truncateBlocking();
-        Thread.sleep(1000);
-        filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        assertEquals(0, filecount);
+    }
 
+    @Test
+    public void testOfflineAbort() throws Exception
+    {
+        testAbortHelper(true, true);
+    }
+    @Test
+    public void testOfflineAbort2() throws Exception
+    {
+        testAbortHelper(false, true);
+    }
+
+    @Test
+    public void testAbort() throws Exception
+    {
+        testAbortHelper(false, false);
     }
 
     @Test
     public void testAbort2() throws Exception
     {
+        testAbortHelper(true, false);
+    }
+
+    private void testAbortHelper(boolean earlyException, boolean offline) throws Exception
+    {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
         cfs.truncateBlocking();
         SSTableReader s = writeFile(cfs, 1000);
-        cfs.addSSTable(s);
+        if (!offline)
+            cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
         SSTableWriter w = getWriter(cfs, s.descriptor.directory);
         rewriter.switchWriter(w);
         try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
@@ -527,7 +526,7 @@ public class SSTableRewriterTest extends SchemaLoader
             }
             try
             {
-                rewriter.finishAndThrow(true);
+                rewriter.finishAndThrow(earlyException);
             }
             catch (Throwable t)
             {
@@ -537,11 +536,25 @@ public class SSTableRewriterTest extends SchemaLoader
         Thread.sleep(1000);
         int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
         assertEquals(filecount, 1);
-        assertEquals(1, cfs.getSSTables().size());
-        validateCFS(cfs);
+        if (!offline)
+        {
+            assertEquals(1, cfs.getSSTables().size());
+            validateCFS(cfs);
+        }
         cfs.truncateBlocking();
         Thread.sleep(1000);
         filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        if (offline)
+        {
+            // the file is not added to the CFS, therefor not truncated away above
+            assertEquals(1, filecount);
+            for (File f : s.descriptor.directory.listFiles())
+            {
+                f.delete();
+            }
+            filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        }
+
         assertEquals(0, filecount);
 
     }