You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/06 12:31:29 UTC

[04/12] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315

Branch: refs/heads/cassandra-3.11
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 45 +++++++++-----------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
 3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     private final class PostFlush implements Callable<ReplayPosition>
     {
-        final boolean flushSecondaryIndexes;
-        final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-                          List<Memtable> memtables)
+        private PostFlush(List<Memtable> memtables)
         {
-            this.writeBarrier = writeBarrier;
-            this.flushSecondaryIndexes = flushSecondaryIndexes;
             this.memtables = memtables;
         }
 
         public ReplayPosition call()
         {
-            writeBarrier.await();
-
-            /**
-             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-             * flushed memtables and CL position, which is as good as we can guarantee.
-             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-             */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
-
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+            postFlush = new PostFlush(memtables);
         }
 
         public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             try
             {
+                boolean flushNonCf2i = true;
                 for (Memtable memtable : memtables)
                 {
                     Collection<SSTableReader> readers = Collections.emptyList();
                     if (!memtable.isClean() && !truncate)
+                    {
+                        // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+                        // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+                        if (flushNonCf2i)
+                        {
+                            indexManager.flushAllNonCFSBackedIndexesBlocking();
+                            flushNonCf2i = false;
+                        }
                         readers = memtable.flush();
+                    }
                     memtable.cfs.replaceFlushed(memtable, readers);
                     reclaim(memtable);
                 }
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
                 postFlush.flushFailure = e;
             }
-
-            // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            finally
+            {
+                // signal the post-flush we've done our work
+                postFlush.latch.countDown();
+            }
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
         assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
     }
 
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+            fail("Flush should have thrown an exception.");
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t.getMessage().contains("Broken2I"));
+        }
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException("Broken2I");
+        }
+    }
+
     private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
     {
         createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",