You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/06 12:31:26 UTC

[01/12] cassandra git commit: Revert "Make sure sstables only get committed when it's safe to discard commit log records"

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 2f268eda3 -> bed3def9a
  refs/heads/cassandra-3.X 5439d94c5 -> f1423806e
  refs/heads/trunk 9a7baa145 -> 48591489d


Revert "Make sure sstables only get committed when it's safe to discard commit log records"

This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f

Branch: refs/heads/cassandra-3.11
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 ++++-------------
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++++--------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 --------------------
 4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
 3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    @VisibleForTesting
-    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
-        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
-        volatile Throwable flushFailure = null;
+        final CountDownLatch latch = new CountDownLatch(1);
+        volatile FSWriteError flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-            try
-            {
-                if (flushSecondaryIndexes)
-                {
-                    indexManager.flushAllNonCFSBackedIndexesBlocking();
-                }
-            }
-            catch (Throwable e)
-            {
-                flushFailure = merge(flushFailure, e);
-            }
-            finally
-            {
-                secondaryIndexFlushLatch.countDown();
-            }
+
+            if (flushSecondaryIndexes)
+                indexManager.flushAllNonCFSBackedIndexesBlocking();
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                memtablesFlushLatch.await();
+                latch.await();
             }
             catch (InterruptedException e)
             {
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                Throwables.propagate(flushFailure);
+                throw flushFailure;
 
             return commitLogUpperBound;
         }
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                    flushMemtable(memtable);
+                {
+                    Collection<SSTableReader> readers = Collections.emptyList();
+                    if (!memtable.isClean() && !truncate)
+                        readers = memtable.flush();
+                    memtable.cfs.replaceFlushed(memtable, readers);
+                    reclaim(memtable);
+                }
             }
-            catch (Throwable e)
+            catch (FSWriteError e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.memtablesFlushLatch.countDown();
-        }
-
-        public Collection<SSTableReader> flushMemtable(Memtable memtable)
-        {
-            if (memtable.isClean() || truncate)
-            {
-                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
-                reclaim(memtable);
-                return Collections.emptyList();
-            }
-
-            Collection<SSTableReader> readers = Collections.emptyList();
-            try (SSTableTxnWriter writer = memtable.flush())
-            {
-                try
-                {
-                    postFlush.secondaryIndexFlushLatch.await();
-                }
-                catch (InterruptedException e)
-                {
-                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
-                }
-
-                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
-                    // sstables should contain non-repaired data.
-                    readers = writer.finish(true);
-                else
-                    maybeFail(writer.abort(postFlush.flushFailure));
-            }
-
-            memtable.cfs.replaceFlushed(memtable, readers);
-            reclaim(memtable);
-            return readers;
+            postFlush.latch.countDown();
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public SSTableTxnWriter flush()
+    public Collection<SSTableReader> flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
-        boolean trackContention = logger.isTraceEnabled();
-        int heavilyContendedRowCount = 0;
-        // (we can't clear out the map as-we-go to free up memory,
-        //  since the memtable is being used for queries in the "pending flush" category)
-        for (AtomicBTreePartition partition : partitions.values())
+        Collection<SSTableReader> ssTables;
+        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
         {
-            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-            // we don't need to preserve tombstones for repair. So if both operation are in this
-            // memtable (which will almost always be the case if there is no ongoing failure), we can
-            // just skip the entry (CASSANDRA-4667).
-            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                continue;
-
-            if (trackContention && partition.usePessimisticLocking())
-                heavilyContendedRowCount++;
-
-            if (!partition.isEmpty())
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (AtomicBTreePartition partition : partitions.values())
             {
-                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+                // we don't need to preserve tombstones for repair. So if both operation are in this
+                // memtable (which will almost always be the case if there is no ongoing failure), we can
+                // just skip the entry (CASSANDRA-4667).
+                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                    continue;
+
+                if (trackContention && partition.usePessimisticLocking())
+                    heavilyContendedRowCount++;
+
+                if (!partition.isEmpty())
                 {
-                    writer.append(iter);
+                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+                    {
+                        writer.append(iter);
+                    }
                 }
             }
-        }
 
-        if (writer.getFilePointer() > 0)
-            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                       writer.getFilename(),
-                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                       commitLogUpperBound));
-        else
-            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                         writer.getFilename(), commitLogUpperBound);
+            if (writer.getFilePointer() > 0)
+            {
+                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                           writer.getFilename(),
+                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                           commitLogUpperBound));
 
-        if (heavilyContendedRowCount > 0)
-            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+                // sstables should contain non-repaired data.
+                ssTables = writer.finish(true);
+            }
+            else
+            {
+                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                             writer.getFilename(), commitLogUpperBound);
+                writer.abort();
+                ssTables = Collections.emptyList();
+            }
 
-        return writer;
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+
+            return ssTables;
+        }
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
-    @Test
-    public void testFailing2iFlush() throws Throwable
-    {
-        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
-        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
-        for (int i = 0; i < 10; i++)
-            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
-        try
-        {
-            getCurrentColumnFamilyStore().forceBlockingFlush();
-        }
-        catch (Throwable t)
-        {
-            // ignore
-        }
-
-        // Make sure there's no flush running
-        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
-                TimeUnit.SECONDS.toMillis(5));
-
-        // SSTables remain uncommitted.
-        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
-    }
-
-    public void waitFor(Supplier<Boolean> condition, long timeout)
-    {
-        long start = System.currentTimeMillis();
-        while(true)
-        {
-            if (condition.get())
-                return;
-
-            assertTrue("Timeout ocurred while waiting for condition",
-                       System.currentTimeMillis() - start < timeout);
-        }
-    }
-
-    // Used for index creation above
-    public static class BrokenCustom2I extends StubIndex
-    {
-        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public Callable<?> getBlockingFlushTask()
-        {
-            throw new RuntimeException();
-        }
-    }
-}


[04/12] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315

Branch: refs/heads/cassandra-3.11
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 45 +++++++++-----------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
 3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     private final class PostFlush implements Callable<ReplayPosition>
     {
-        final boolean flushSecondaryIndexes;
-        final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-                          List<Memtable> memtables)
+        private PostFlush(List<Memtable> memtables)
         {
-            this.writeBarrier = writeBarrier;
-            this.flushSecondaryIndexes = flushSecondaryIndexes;
             this.memtables = memtables;
         }
 
         public ReplayPosition call()
         {
-            writeBarrier.await();
-
-            /**
-             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-             * flushed memtables and CL position, which is as good as we can guarantee.
-             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-             */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
-
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+            postFlush = new PostFlush(memtables);
         }
 
         public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             try
             {
+                boolean flushNonCf2i = true;
                 for (Memtable memtable : memtables)
                 {
                     Collection<SSTableReader> readers = Collections.emptyList();
                     if (!memtable.isClean() && !truncate)
+                    {
+                        // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+                        // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+                        if (flushNonCf2i)
+                        {
+                            indexManager.flushAllNonCFSBackedIndexesBlocking();
+                            flushNonCf2i = false;
+                        }
                         readers = memtable.flush();
+                    }
                     memtable.cfs.replaceFlushed(memtable, readers);
                     reclaim(memtable);
                 }
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
                 postFlush.flushFailure = e;
             }
-
-            // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            finally
+            {
+                // signal the post-flush we've done our work
+                postFlush.latch.countDown();
+            }
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
         assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
     }
 
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+            fail("Flush should have thrown an exception.");
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t.getMessage().contains("Broken2I"));
+        }
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException("Broken2I");
+        }
+    }
+
     private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
     {
         createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",


[10/12] cassandra git commit: Merge branch 'cassandra-3.11' into cassandra-3.X

Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1423806
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1423806
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1423806

Branch: refs/heads/trunk
Commit: f1423806e7263cbb7cb357f728b5b5181362d892
Parents: 5439d94 bed3def
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:28:27 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:28:27 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[12/12] cassandra git commit: Merge branch 'cassandra-3.X' into trunk

Posted by bl...@apache.org.
Merge branch 'cassandra-3.X' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48591489
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48591489
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48591489

Branch: refs/heads/trunk
Commit: 48591489dd214d5b4df8d1c9e8c5ce1ff1abff93
Parents: 9a7baa1 f142380
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:29:32 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:29:32 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[07/12] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bed3def9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bed3def9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bed3def9

Branch: refs/heads/cassandra-3.X
Commit: bed3def9a0188daad4b3306d5aea28b416be85c2
Parents: 2f268ed 0ecef31
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:27:46 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:27:46 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[05/12] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315

Branch: refs/heads/cassandra-3.X
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 45 +++++++++-----------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
 3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     private final class PostFlush implements Callable<ReplayPosition>
     {
-        final boolean flushSecondaryIndexes;
-        final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-                          List<Memtable> memtables)
+        private PostFlush(List<Memtable> memtables)
         {
-            this.writeBarrier = writeBarrier;
-            this.flushSecondaryIndexes = flushSecondaryIndexes;
             this.memtables = memtables;
         }
 
         public ReplayPosition call()
         {
-            writeBarrier.await();
-
-            /**
-             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-             * flushed memtables and CL position, which is as good as we can guarantee.
-             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-             */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
-
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+            postFlush = new PostFlush(memtables);
         }
 
         public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             try
             {
+                boolean flushNonCf2i = true;
                 for (Memtable memtable : memtables)
                 {
                     Collection<SSTableReader> readers = Collections.emptyList();
                     if (!memtable.isClean() && !truncate)
+                    {
+                        // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+                        // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+                        if (flushNonCf2i)
+                        {
+                            indexManager.flushAllNonCFSBackedIndexesBlocking();
+                            flushNonCf2i = false;
+                        }
                         readers = memtable.flush();
+                    }
                     memtable.cfs.replaceFlushed(memtable, readers);
                     reclaim(memtable);
                 }
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
                 postFlush.flushFailure = e;
             }
-
-            // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            finally
+            {
+                // signal the post-flush we've done our work
+                postFlush.latch.countDown();
+            }
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
         assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
     }
 
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+            fail("Flush should have thrown an exception.");
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t.getMessage().contains("Broken2I"));
+        }
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException("Broken2I");
+        }
+    }
+
     private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
     {
         createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",


[11/12] cassandra git commit: Merge branch 'cassandra-3.11' into cassandra-3.X

Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1423806
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1423806
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1423806

Branch: refs/heads/cassandra-3.X
Commit: f1423806e7263cbb7cb357f728b5b5181362d892
Parents: 5439d94 bed3def
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:28:27 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:28:27 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[03/12] cassandra git commit: Revert "Make sure sstables only get committed when it's safe to discard commit log records"

Posted by bl...@apache.org.
Revert "Make sure sstables only get committed when it's safe to discard commit log records"

This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f

Branch: refs/heads/trunk
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 ++++-------------
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++++--------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 --------------------
 4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
 3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    @VisibleForTesting
-    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
-        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
-        volatile Throwable flushFailure = null;
+        final CountDownLatch latch = new CountDownLatch(1);
+        volatile FSWriteError flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-            try
-            {
-                if (flushSecondaryIndexes)
-                {
-                    indexManager.flushAllNonCFSBackedIndexesBlocking();
-                }
-            }
-            catch (Throwable e)
-            {
-                flushFailure = merge(flushFailure, e);
-            }
-            finally
-            {
-                secondaryIndexFlushLatch.countDown();
-            }
+
+            if (flushSecondaryIndexes)
+                indexManager.flushAllNonCFSBackedIndexesBlocking();
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                memtablesFlushLatch.await();
+                latch.await();
             }
             catch (InterruptedException e)
             {
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                Throwables.propagate(flushFailure);
+                throw flushFailure;
 
             return commitLogUpperBound;
         }
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                    flushMemtable(memtable);
+                {
+                    Collection<SSTableReader> readers = Collections.emptyList();
+                    if (!memtable.isClean() && !truncate)
+                        readers = memtable.flush();
+                    memtable.cfs.replaceFlushed(memtable, readers);
+                    reclaim(memtable);
+                }
             }
-            catch (Throwable e)
+            catch (FSWriteError e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.memtablesFlushLatch.countDown();
-        }
-
-        public Collection<SSTableReader> flushMemtable(Memtable memtable)
-        {
-            if (memtable.isClean() || truncate)
-            {
-                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
-                reclaim(memtable);
-                return Collections.emptyList();
-            }
-
-            Collection<SSTableReader> readers = Collections.emptyList();
-            try (SSTableTxnWriter writer = memtable.flush())
-            {
-                try
-                {
-                    postFlush.secondaryIndexFlushLatch.await();
-                }
-                catch (InterruptedException e)
-                {
-                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
-                }
-
-                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
-                    // sstables should contain non-repaired data.
-                    readers = writer.finish(true);
-                else
-                    maybeFail(writer.abort(postFlush.flushFailure));
-            }
-
-            memtable.cfs.replaceFlushed(memtable, readers);
-            reclaim(memtable);
-            return readers;
+            postFlush.latch.countDown();
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public SSTableTxnWriter flush()
+    public Collection<SSTableReader> flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
-        boolean trackContention = logger.isTraceEnabled();
-        int heavilyContendedRowCount = 0;
-        // (we can't clear out the map as-we-go to free up memory,
-        //  since the memtable is being used for queries in the "pending flush" category)
-        for (AtomicBTreePartition partition : partitions.values())
+        Collection<SSTableReader> ssTables;
+        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
         {
-            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-            // we don't need to preserve tombstones for repair. So if both operation are in this
-            // memtable (which will almost always be the case if there is no ongoing failure), we can
-            // just skip the entry (CASSANDRA-4667).
-            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                continue;
-
-            if (trackContention && partition.usePessimisticLocking())
-                heavilyContendedRowCount++;
-
-            if (!partition.isEmpty())
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (AtomicBTreePartition partition : partitions.values())
             {
-                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+                // we don't need to preserve tombstones for repair. So if both operation are in this
+                // memtable (which will almost always be the case if there is no ongoing failure), we can
+                // just skip the entry (CASSANDRA-4667).
+                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                    continue;
+
+                if (trackContention && partition.usePessimisticLocking())
+                    heavilyContendedRowCount++;
+
+                if (!partition.isEmpty())
                 {
-                    writer.append(iter);
+                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+                    {
+                        writer.append(iter);
+                    }
                 }
             }
-        }
 
-        if (writer.getFilePointer() > 0)
-            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                       writer.getFilename(),
-                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                       commitLogUpperBound));
-        else
-            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                         writer.getFilename(), commitLogUpperBound);
+            if (writer.getFilePointer() > 0)
+            {
+                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                           writer.getFilename(),
+                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                           commitLogUpperBound));
 
-        if (heavilyContendedRowCount > 0)
-            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+                // sstables should contain non-repaired data.
+                ssTables = writer.finish(true);
+            }
+            else
+            {
+                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                             writer.getFilename(), commitLogUpperBound);
+                writer.abort();
+                ssTables = Collections.emptyList();
+            }
 
-        return writer;
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+
+            return ssTables;
+        }
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
-    @Test
-    public void testFailing2iFlush() throws Throwable
-    {
-        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
-        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
-        for (int i = 0; i < 10; i++)
-            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
-        try
-        {
-            getCurrentColumnFamilyStore().forceBlockingFlush();
-        }
-        catch (Throwable t)
-        {
-            // ignore
-        }
-
-        // Make sure there's no flush running
-        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
-                TimeUnit.SECONDS.toMillis(5));
-
-        // SSTables remain uncommitted.
-        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
-    }
-
-    public void waitFor(Supplier<Boolean> condition, long timeout)
-    {
-        long start = System.currentTimeMillis();
-        while(true)
-        {
-            if (condition.get())
-                return;
-
-            assertTrue("Timeout ocurred while waiting for condition",
-                       System.currentTimeMillis() - start < timeout);
-        }
-    }
-
-    // Used for index creation above
-    public static class BrokenCustom2I extends StubIndex
-    {
-        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public Callable<?> getBlockingFlushTask()
-        {
-            throw new RuntimeException();
-        }
-    }
-}


[08/12] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bed3def9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bed3def9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bed3def9

Branch: refs/heads/trunk
Commit: bed3def9a0188daad4b3306d5aea28b416be85c2
Parents: 2f268ed 0ecef31
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:27:46 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:27:46 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[09/12] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bed3def9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bed3def9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bed3def9

Branch: refs/heads/cassandra-3.11
Commit: bed3def9a0188daad4b3306d5aea28b416be85c2
Parents: 2f268ed 0ecef31
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:27:46 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:27:46 2016 +0200

----------------------------------------------------------------------

----------------------------------------------------------------------



[02/12] cassandra git commit: Revert "Make sure sstables only get committed when it's safe to discard commit log records"

Posted by bl...@apache.org.
Revert "Make sure sstables only get committed when it's safe to discard commit log records"

This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f

Branch: refs/heads/cassandra-3.X
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 -
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 ++++-------------
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++++--------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 --------------------
 4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
 3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    @VisibleForTesting
-    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
-        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
-        volatile Throwable flushFailure = null;
+        final CountDownLatch latch = new CountDownLatch(1);
+        volatile FSWriteError flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-            try
-            {
-                if (flushSecondaryIndexes)
-                {
-                    indexManager.flushAllNonCFSBackedIndexesBlocking();
-                }
-            }
-            catch (Throwable e)
-            {
-                flushFailure = merge(flushFailure, e);
-            }
-            finally
-            {
-                secondaryIndexFlushLatch.countDown();
-            }
+
+            if (flushSecondaryIndexes)
+                indexManager.flushAllNonCFSBackedIndexesBlocking();
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                memtablesFlushLatch.await();
+                latch.await();
             }
             catch (InterruptedException e)
             {
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                Throwables.propagate(flushFailure);
+                throw flushFailure;
 
             return commitLogUpperBound;
         }
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                    flushMemtable(memtable);
+                {
+                    Collection<SSTableReader> readers = Collections.emptyList();
+                    if (!memtable.isClean() && !truncate)
+                        readers = memtable.flush();
+                    memtable.cfs.replaceFlushed(memtable, readers);
+                    reclaim(memtable);
+                }
             }
-            catch (Throwable e)
+            catch (FSWriteError e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.memtablesFlushLatch.countDown();
-        }
-
-        public Collection<SSTableReader> flushMemtable(Memtable memtable)
-        {
-            if (memtable.isClean() || truncate)
-            {
-                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
-                reclaim(memtable);
-                return Collections.emptyList();
-            }
-
-            Collection<SSTableReader> readers = Collections.emptyList();
-            try (SSTableTxnWriter writer = memtable.flush())
-            {
-                try
-                {
-                    postFlush.secondaryIndexFlushLatch.await();
-                }
-                catch (InterruptedException e)
-                {
-                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
-                }
-
-                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
-                    // sstables should contain non-repaired data.
-                    readers = writer.finish(true);
-                else
-                    maybeFail(writer.abort(postFlush.flushFailure));
-            }
-
-            memtable.cfs.replaceFlushed(memtable, readers);
-            reclaim(memtable);
-            return readers;
+            postFlush.latch.countDown();
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public SSTableTxnWriter flush()
+    public Collection<SSTableReader> flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
-        boolean trackContention = logger.isTraceEnabled();
-        int heavilyContendedRowCount = 0;
-        // (we can't clear out the map as-we-go to free up memory,
-        //  since the memtable is being used for queries in the "pending flush" category)
-        for (AtomicBTreePartition partition : partitions.values())
+        Collection<SSTableReader> ssTables;
+        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
         {
-            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-            // we don't need to preserve tombstones for repair. So if both operation are in this
-            // memtable (which will almost always be the case if there is no ongoing failure), we can
-            // just skip the entry (CASSANDRA-4667).
-            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                continue;
-
-            if (trackContention && partition.usePessimisticLocking())
-                heavilyContendedRowCount++;
-
-            if (!partition.isEmpty())
+            boolean trackContention = logger.isTraceEnabled();
+            int heavilyContendedRowCount = 0;
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (AtomicBTreePartition partition : partitions.values())
             {
-                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+                // we don't need to preserve tombstones for repair. So if both operation are in this
+                // memtable (which will almost always be the case if there is no ongoing failure), we can
+                // just skip the entry (CASSANDRA-4667).
+                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                    continue;
+
+                if (trackContention && partition.usePessimisticLocking())
+                    heavilyContendedRowCount++;
+
+                if (!partition.isEmpty())
                 {
-                    writer.append(iter);
+                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+                    {
+                        writer.append(iter);
+                    }
                 }
             }
-        }
 
-        if (writer.getFilePointer() > 0)
-            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                       writer.getFilename(),
-                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                       commitLogUpperBound));
-        else
-            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                         writer.getFilename(), commitLogUpperBound);
+            if (writer.getFilePointer() > 0)
+            {
+                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                           writer.getFilename(),
+                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                           commitLogUpperBound));
 
-        if (heavilyContendedRowCount > 0)
-            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+                // sstables should contain non-repaired data.
+                ssTables = writer.finish(true);
+            }
+            else
+            {
+                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                             writer.getFilename(), commitLogUpperBound);
+                writer.abort();
+                ssTables = Collections.emptyList();
+            }
 
-        return writer;
+            if (heavilyContendedRowCount > 0)
+                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+
+            return ssTables;
+        }
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
-    @Test
-    public void testFailing2iFlush() throws Throwable
-    {
-        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
-        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
-        for (int i = 0; i < 10; i++)
-            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
-        try
-        {
-            getCurrentColumnFamilyStore().forceBlockingFlush();
-        }
-        catch (Throwable t)
-        {
-            // ignore
-        }
-
-        // Make sure there's no flush running
-        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
-                TimeUnit.SECONDS.toMillis(5));
-
-        // SSTables remain uncommitted.
-        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
-    }
-
-    public void waitFor(Supplier<Boolean> condition, long timeout)
-    {
-        long start = System.currentTimeMillis();
-        while(true)
-        {
-            if (condition.get())
-                return;
-
-            assertTrue("Timeout ocurred while waiting for condition",
-                       System.currentTimeMillis() - start < timeout);
-        }
-    }
-
-    // Used for index creation above
-    public static class BrokenCustom2I extends StubIndex
-    {
-        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
-        {
-            super(baseCfs, metadata);
-        }
-
-        public Callable<?> getBlockingFlushTask()
-        {
-            throw new RuntimeException();
-        }
-    }
-}


[06/12] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315

Branch: refs/heads/trunk
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 45 +++++++++-----------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
 3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     private final class PostFlush implements Callable<ReplayPosition>
     {
-        final boolean flushSecondaryIndexes;
-        final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-                          List<Memtable> memtables)
+        private PostFlush(List<Memtable> memtables)
         {
-            this.writeBarrier = writeBarrier;
-            this.flushSecondaryIndexes = flushSecondaryIndexes;
             this.memtables = memtables;
         }
 
         public ReplayPosition call()
         {
-            writeBarrier.await();
-
-            /**
-             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-             * flushed memtables and CL position, which is as good as we can guarantee.
-             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-             */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
-
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+            postFlush = new PostFlush(memtables);
         }
 
         public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             try
             {
+                boolean flushNonCf2i = true;
                 for (Memtable memtable : memtables)
                 {
                     Collection<SSTableReader> readers = Collections.emptyList();
                     if (!memtable.isClean() && !truncate)
+                    {
+                        // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+                        // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+                        if (flushNonCf2i)
+                        {
+                            indexManager.flushAllNonCFSBackedIndexesBlocking();
+                            flushNonCf2i = false;
+                        }
                         readers = memtable.flush();
+                    }
                     memtable.cfs.replaceFlushed(memtable, readers);
                     reclaim(memtable);
                 }
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
                 postFlush.flushFailure = e;
             }
-
-            // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            finally
+            {
+                // signal the post-flush we've done our work
+                postFlush.latch.countDown();
+            }
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
         assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
     }
 
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+            fail("Flush should have thrown an exception.");
+        }
+        catch (Throwable t)
+        {
+            assertTrue(t.getMessage().contains("Broken2I"));
+        }
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException("Broken2I");
+        }
+    }
+
     private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
     {
         createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",