You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/06 11:21:47 UTC

[03/10] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e

Branch: refs/heads/cassandra-3.X
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 +++++++++++++----
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++----------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    @VisibleForTesting
+    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
+            try
+            {
+                if (flushSecondaryIndexes)
+                {
+                    indexManager.flushAllNonCFSBackedIndexesBlocking();
+                }
+            }
+            catch (Throwable e)
+            {
+                flushFailure = merge(flushFailure, e);
+            }
+            finally
+            {
+                secondaryIndexFlushLatch.countDown();
+            }
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                latch.await();
+                memtablesFlushLatch.await();
             }
             catch (InterruptedException e)
             {
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                {
-                    Collection<SSTableReader> readers = Collections.emptyList();
-                    if (!memtable.isClean() && !truncate)
-                        readers = memtable.flush();
-                    memtable.cfs.replaceFlushed(memtable, readers);
-                    reclaim(memtable);
-                }
+                    flushMemtable(memtable);
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            postFlush.memtablesFlushLatch.countDown();
+        }
+
+        public Collection<SSTableReader> flushMemtable(Memtable memtable)
+        {
+            if (memtable.isClean() || truncate)
+            {
+                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+                reclaim(memtable);
+                return Collections.emptyList();
+            }
+
+            Collection<SSTableReader> readers = Collections.emptyList();
+            try (SSTableTxnWriter writer = memtable.flush())
+            {
+                try
+                {
+                    postFlush.secondaryIndexFlushLatch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
+                }
+
+                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+                    // sstables should contain non-repaired data.
+                    readers = writer.finish(true);
+                else
+                    maybeFail(writer.abort(postFlush.flushFailure));
+            }
+
+            memtable.cfs.replaceFlushed(memtable, readers);
+            reclaim(memtable);
+            return readers;
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public Collection<SSTableReader> flush()
+    public SSTableTxnWriter flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        Collection<SSTableReader> ssTables;
-        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+        boolean trackContention = logger.isTraceEnabled();
+        int heavilyContendedRowCount = 0;
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending flush" category)
+        for (AtomicBTreePartition partition : partitions.values())
         {
-            boolean trackContention = logger.isTraceEnabled();
-            int heavilyContendedRowCount = 0;
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending flush" category)
-            for (AtomicBTreePartition partition : partitions.values())
+            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+            // we don't need to preserve tombstones for repair. So if both operation are in this
+            // memtable (which will almost always be the case if there is no ongoing failure), we can
+            // just skip the entry (CASSANDRA-4667).
+            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                continue;
+
+            if (trackContention && partition.usePessimisticLocking())
+                heavilyContendedRowCount++;
+
+            if (!partition.isEmpty())
             {
-                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                // we don't need to preserve tombstones for repair. So if both operation are in this
-                // memtable (which will almost always be the case if there is no ongoing failure), we can
-                // just skip the entry (CASSANDRA-4667).
-                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                    continue;
-
-                if (trackContention && partition.usePessimisticLocking())
-                    heavilyContendedRowCount++;
-
-                if (!partition.isEmpty())
+                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                 {
-                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                    {
-                        writer.append(iter);
-                    }
+                    writer.append(iter);
                 }
             }
+        }
 
-            if (writer.getFilePointer() > 0)
-            {
-                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                           writer.getFilename(),
-                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                           commitLogUpperBound));
-
-                // sstables should contain non-repaired data.
-                ssTables = writer.finish(true);
-            }
-            else
-            {
-                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                             writer.getFilename(), commitLogUpperBound);
-                writer.abort();
-                ssTables = Collections.emptyList();
-            }
+        if (writer.getFilePointer() > 0)
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                       writer.getFilename(),
+                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                       commitLogUpperBound));
+        else
+            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                         writer.getFilename(), commitLogUpperBound);
 
-            if (heavilyContendedRowCount > 0)
-                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        if (heavilyContendedRowCount > 0)
+            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-            return ssTables;
-        }
+        return writer;
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+
+        // Make sure there's no flush running
+        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+                TimeUnit.SECONDS.toMillis(5));
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    public void waitFor(Supplier<Boolean> condition, long timeout)
+    {
+        long start = System.currentTimeMillis();
+        while(true)
+        {
+            if (condition.get())
+                return;
+
+            assertTrue("Timeout ocurred while waiting for condition",
+                       System.currentTimeMillis() - start < timeout);
+        }
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException();
+        }
+    }
+}