You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/26 16:22:08 UTC

cassandra git commit: Handle abort() properly in SSTableRewriter

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 3faff8b15 -> b10629291


Handle abort() properly in SSTableRewriter

Patch by marcuse; reviewed by jmckenzie for CASSANDRA-8320


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1062929
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1062929
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1062929

Branch: refs/heads/cassandra-2.1
Commit: b1062929185690567e4567e0e657b361c5105482
Parents: 3faff8b
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Nov 18 07:07:30 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Nov 26 16:00:51 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/io/sstable/SSTableReader.java     |  22 +++
 .../cassandra/io/sstable/SSTableRewriter.java   |  74 ++++++--
 .../io/sstable/SSTableRewriterTest.java         | 180 +++++++++++++++----
 4 files changed, 226 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f022b19..e5f7c28 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Handle abort() in SSTableRewriter properly (CASSANDRA-8320)
  * Fix high size calculations for prepared statements (CASSANDRA-8231)
  * Centralize shared executors (CASSANDRA-8055)
  * Fix filtering for CONTAINS (KEY) relations on frozen collection

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index a3e3cf5..1fe4330 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -202,6 +202,7 @@ public class SSTableReader extends SSTable
     private Object replaceLock = new Object();
     private SSTableReader replacedBy;
     private SSTableReader replaces;
+    private SSTableReader sharesBfWith;
     private SSTableDeletingTask deletingTask;
     private Runnable runOnClose;
 
@@ -594,6 +595,14 @@ public class SSTableReader extends SSTable
                 deleteFiles &= !dfile.path.equals(replaces.dfile.path);
             }
 
+            if (sharesBfWith != null)
+            {
+                closeBf &= sharesBfWith.bf != bf;
+                closeSummary &= sharesBfWith.indexSummary != indexSummary;
+                closeFiles &= sharesBfWith.dfile != dfile;
+                deleteFiles &= !dfile.path.equals(sharesBfWith.dfile.path);
+            }
+
             boolean deleteAll = false;
             if (release && isCompacted.get())
             {
@@ -928,6 +937,19 @@ public class SSTableReader extends SSTable
         }
     }
 
+    /**
+     * this is used to avoid closing the bloom filter multiple times when finishing an SSTableRewriter
+     *
+     * note that the reason we don't use replacedBy is that we are not yet actually replaced
+     *
+     * @param newReader
+     */
+    public void sharesBfWith(SSTableReader newReader)
+    {
+        assert openReason.equals(OpenReason.EARLY);
+        this.sharesBfWith = newReader;
+    }
+
     public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
     {
         synchronized (replaceLock)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 4d5a06f..d187e9d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -75,6 +76,7 @@ public class SSTableRewriter
     private final ColumnFamilyStore cfs;
 
     private final long maxAge;
+    private final List<SSTableReader> finished = new ArrayList<>();
     private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
     private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
     private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
@@ -180,16 +182,11 @@ public class SSTableRewriter
 
     public void abort()
     {
-        if (writer == null)
-            return;
-
         switchWriter(null);
 
         moveStarts(null, Functions.forMap(originalStarts), true);
 
         List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
-        if (currentlyOpenedEarly != null)
-            close.add(currentlyOpenedEarly);
 
         for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
         {
@@ -202,6 +199,12 @@ public class SSTableRewriter
         for (SSTableReader sstable : close)
             sstable.markObsolete();
 
+        for (SSTableReader sstable : finished)
+        {
+            sstable.markObsolete();
+            sstable.releaseReference();
+        }
+
         // releases reference in replaceReaders
         if (!isOffline)
         {
@@ -210,6 +213,7 @@ public class SSTableRewriter
         }
     }
 
+
     /**
      * Replace the readers we are rewriting with cloneWithNewStart, reclaiming any page cache that is no longer
      * needed, and transferring any key cache entries over to the new reader, expiring them from the old. if reset
@@ -327,38 +331,70 @@ public class SSTableRewriter
      */
     public List<SSTableReader> finish(long repairedAt)
     {
-        List<SSTableReader> finished = new ArrayList<>();
-        if (writer.getFilePointer() > 0)
-        {
-            SSTableReader reader = repairedAt < 0 ? writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, repairedAt);
-            finished.add(reader);
-            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
-            moveStarts(reader, Functions.constant(reader.last), false);
-        }
-        else
-        {
-            writer.abort(true);
-        }
+        List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+        switchWriter(null);
         // make real sstables of the written ones:
-        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+        Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
+        while(it.hasNext())
         {
+            Pair<SSTableWriter, SSTableReader> w = it.next();
             if (w.left.getFilePointer() > 0)
             {
                 SSTableReader newReader = repairedAt < 0 ? w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, repairedAt);
                 finished.add(newReader);
+
+                if (w.right != null)
+                    w.right.sharesBfWith(newReader);
                 // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
-                replaceEarlyOpenedFile(w.right, newReader);
+                toReplace.add(Pair.create(w.right, newReader));
             }
             else
             {
                 assert w.right == null;
                 w.left.abort(true);
             }
+            it.remove();
         }
+
+        for (Pair<SSTableReader, SSTableReader> replace : toReplace)
+            replaceEarlyOpenedFile(replace.left, replace.right);
+
         if (!isOffline)
         {
             dataTracker.unmarkCompacting(finished);
         }
         return finished;
     }
+
+    @VisibleForTesting
+    void finishAndThrow(boolean early)
+    {
+        List<Pair<SSTableReader, SSTableReader>> toReplace = new ArrayList<>();
+        switchWriter(null);
+        if (early)
+            throw new RuntimeException("exception thrown early in finish");
+        // make real sstables of the written ones:
+        Iterator<Pair<SSTableWriter, SSTableReader>> it = finishedWriters.iterator();
+        while(it.hasNext())
+        {
+            Pair<SSTableWriter, SSTableReader> w = it.next();
+            if (w.left.getFilePointer() > 0)
+            {
+                SSTableReader newReader = w.left.closeAndOpenReader(maxAge);
+                finished.add(newReader);
+                if (w.right != null)
+                    w.right.sharesBfWith(newReader);
+                // w.right is the tmplink-reader we added when switching writer, replace with the real sstable.
+                toReplace.add(Pair.create(w.right, newReader));
+            }
+            else
+            {
+                assert w.right == null;
+                w.left.abort(true);
+            }
+            it.remove();
+        }
+
+        throw new RuntimeException("exception thrown after all sstables finished");
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1062929/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8a494a6..0a76b66 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -29,9 +30,11 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ArrayBackedSortedColumns;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
@@ -80,12 +83,45 @@ public class SSTableRewriterTest extends SchemaLoader
                 writer.append(row);
             }
         }
-        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, writer.finish(), OperationType.COMPACTION);
-
+        Collection<SSTableReader> newsstables = writer.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION);
+        Thread.sleep(100);
         validateCFS(cfs);
+        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0);
+        assertEquals(1, filecounts);
 
     }
+    @Test
+    public void basicTest2() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
 
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        {
+            ICompactionScanner scanner = scanners.scanners.get(0);
+            CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            while (scanner.hasNext())
+            {
+                AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
+                writer.append(row);
+            }
+        }
+        Collection<SSTableReader> newsstables = writer.finish();
+        cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
+        Thread.sleep(100);
+        validateCFS(cfs);
+        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0);
+        assertEquals(1, filecounts);
+    }
 
     @Test
     public void testFileRemoval() throws InterruptedException
@@ -114,37 +150,11 @@ public class SSTableRewriterTest extends SchemaLoader
         assertFileCounts(dir.list(), 0, 3);
         writer.abort(false);
         Thread.sleep(1000);
-        assertFileCounts(dir.list(), 0, 0);
-        validateCFS(cfs);
-    }
-
-    @Test
-    public void testFileRemovalNoAbort() throws InterruptedException
-    {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
-        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
-        for (int i = 0; i < 1000; i++)
-            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
-        File dir = cfs.directories.getDirectoryForNewSSTables();
-        SSTableWriter writer = getWriter(cfs, dir);
-
-        for (int i = 0; i < 500; i++)
-            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
-        SSTableReader s = writer.openEarly(1000);
-        //assertFileCounts(dir.list(), 2, 3);
-        for (int i = 500; i < 1000; i++)
-            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
-        writer.closeAndOpenReader();
-        s.markObsolete();
-        s.releaseReference();
-        Thread.sleep(1000);
-        assertFileCounts(dir.list(), 0, 0);
+        int datafiles = assertFileCounts(dir.list(), 0, 0);
+        assertEquals(datafiles, 0);
         validateCFS(cfs);
     }
 
-
     @Test
     public void testNumberOfFilesAndSizes() throws Exception
     {
@@ -446,6 +456,95 @@ public class SSTableRewriterTest extends SchemaLoader
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
         validateCFS(cfs);
     }
+    @Test
+    public void testAbort() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+        rewriter.switchWriter(w);
+        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            while (scanner.hasNext())
+            {
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                }
+            }
+            try
+            {
+                rewriter.finishAndThrow(false);
+            }
+            catch (Throwable t)
+            {
+                rewriter.abort();
+            }
+        }
+        Thread.sleep(1000);
+        int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertEquals(filecount, 1);
+        assertEquals(1, cfs.getSSTables().size());
+        validateCFS(cfs);
+        cfs.truncateBlocking();
+        Thread.sleep(1000);
+        filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertEquals(0, filecount);
+
+    }
+
+    @Test
+    public void testAbort2() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
+        SSTableWriter w = getWriter(cfs, s.descriptor.directory);
+        rewriter.switchWriter(w);
+        try (ICompactionScanner scanner = compacting.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting, 0))
+        {
+            while (scanner.hasNext())
+            {
+                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                {
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                }
+            }
+            try
+            {
+                rewriter.finishAndThrow(true);
+            }
+            catch (Throwable t)
+            {
+                rewriter.abort();
+            }
+        }
+        Thread.sleep(1000);
+        int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertEquals(filecount, 1);
+        assertEquals(1, cfs.getSSTables().size());
+        validateCFS(cfs);
+        cfs.truncateBlocking();
+        Thread.sleep(1000);
+        filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertEquals(0, filecount);
+
+    }
 
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
@@ -469,28 +568,45 @@ public class SSTableRewriterTest extends SchemaLoader
 
     private void validateCFS(ColumnFamilyStore cfs)
     {
+        Set<Integer> liveDescriptors = new HashSet<>();
         for (SSTableReader sstable : cfs.getSSTables())
         {
             assertFalse(sstable.isMarkedCompacted());
             assertEquals(1, sstable.referenceCount());
+            liveDescriptors.add(sstable.descriptor.generation);
+        }
+        for (File dir : cfs.directories.getCFDirectories())
+        {
+            for (String f : dir.list())
+            {
+                if (f.contains("Data"))
+                {
+                    Descriptor d = Descriptor.fromFilename(f);
+                    assertTrue(d.toString(), liveDescriptors.contains(d.generation));
+                }
+            }
         }
         assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
     }
 
 
-    private void assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+    private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
     {
         int tmplinkcount = 0;
         int tmpcount = 0;
+        int datacount = 0;
         for (String f : files)
         {
             if (f.contains("-tmplink-"))
                 tmplinkcount++;
-            if (f.contains("-tmp-"))
+            else if (f.contains("-tmp-"))
                 tmpcount++;
+            else if (f.contains("Data"))
+                datacount++;
         }
         assertEquals(expectedtmplinkCount, tmplinkcount);
         assertEquals(expectedtmpCount, tmpcount);
+        return datacount;
     }
 
     private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)