You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/06 11:21:45 UTC

[01/10] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 5f64ed7cc -> 6f90e55e7
  refs/heads/cassandra-3.11 b207f2e3b -> 2f268eda3
  refs/heads/cassandra-3.X 838a21d40 -> 5439d94c5
  refs/heads/trunk 8ddbb7493 -> 9a7baa145


Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e

Branch: refs/heads/cassandra-3.0
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 +++++++++++++----
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++----------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    @VisibleForTesting
+    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
+            try
+            {
+                if (flushSecondaryIndexes)
+                {
+                    indexManager.flushAllNonCFSBackedIndexesBlocking();
+                }
+            }
+            catch (Throwable e)
+            {
+                flushFailure = merge(flushFailure, e);
+            }
+            finally
+            {
+                secondaryIndexFlushLatch.countDown();
+            }
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                latch.await();
+                memtablesFlushLatch.await();
             }
             catch (InterruptedException e)
             {
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                {
-                    Collection<SSTableReader> readers = Collections.emptyList();
-                    if (!memtable.isClean() && !truncate)
-                        readers = memtable.flush();
-                    memtable.cfs.replaceFlushed(memtable, readers);
-                    reclaim(memtable);
-                }
+                    flushMemtable(memtable);
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            postFlush.memtablesFlushLatch.countDown();
+        }
+
+        public Collection<SSTableReader> flushMemtable(Memtable memtable)
+        {
+            if (memtable.isClean() || truncate)
+            {
+                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+                reclaim(memtable);
+                return Collections.emptyList();
+            }
+
+            Collection<SSTableReader> readers = Collections.emptyList();
+            try (SSTableTxnWriter writer = memtable.flush())
+            {
+                try
+                {
+                    postFlush.secondaryIndexFlushLatch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
+                }
+
+                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+                    // sstables should contain non-repaired data.
+                    readers = writer.finish(true);
+                else
+                    maybeFail(writer.abort(postFlush.flushFailure));
+            }
+
+            memtable.cfs.replaceFlushed(memtable, readers);
+            reclaim(memtable);
+            return readers;
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public Collection<SSTableReader> flush()
+    public SSTableTxnWriter flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        Collection<SSTableReader> ssTables;
-        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+        boolean trackContention = logger.isTraceEnabled();
+        int heavilyContendedRowCount = 0;
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending flush" category)
+        for (AtomicBTreePartition partition : partitions.values())
         {
-            boolean trackContention = logger.isTraceEnabled();
-            int heavilyContendedRowCount = 0;
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending flush" category)
-            for (AtomicBTreePartition partition : partitions.values())
+            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+            // we don't need to preserve tombstones for repair. So if both operation are in this
+            // memtable (which will almost always be the case if there is no ongoing failure), we can
+            // just skip the entry (CASSANDRA-4667).
+            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                continue;
+
+            if (trackContention && partition.usePessimisticLocking())
+                heavilyContendedRowCount++;
+
+            if (!partition.isEmpty())
             {
-                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                // we don't need to preserve tombstones for repair. So if both operation are in this
-                // memtable (which will almost always be the case if there is no ongoing failure), we can
-                // just skip the entry (CASSANDRA-4667).
-                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                    continue;
-
-                if (trackContention && partition.usePessimisticLocking())
-                    heavilyContendedRowCount++;
-
-                if (!partition.isEmpty())
+                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                 {
-                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                    {
-                        writer.append(iter);
-                    }
+                    writer.append(iter);
                 }
             }
+        }
 
-            if (writer.getFilePointer() > 0)
-            {
-                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                           writer.getFilename(),
-                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                           commitLogUpperBound));
-
-                // sstables should contain non-repaired data.
-                ssTables = writer.finish(true);
-            }
-            else
-            {
-                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                             writer.getFilename(), commitLogUpperBound);
-                writer.abort();
-                ssTables = Collections.emptyList();
-            }
+        if (writer.getFilePointer() > 0)
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                       writer.getFilename(),
+                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                       commitLogUpperBound));
+        else
+            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                         writer.getFilename(), commitLogUpperBound);
 
-            if (heavilyContendedRowCount > 0)
-                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        if (heavilyContendedRowCount > 0)
+            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-            return ssTables;
-        }
+        return writer;
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+
+        // Make sure there's no flush running
+        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+                TimeUnit.SECONDS.toMillis(5));
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    public void waitFor(Supplier<Boolean> condition, long timeout)
+    {
+        long start = System.currentTimeMillis();
+        while(true)
+        {
+            if (condition.get())
+                return;
+
+            assertTrue("Timeout ocurred while waiting for condition",
+                       System.currentTimeMillis() - start < timeout);
+        }
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException();
+        }
+    }
+}


[08/10] cassandra git commit: Merge branch 'cassandra-3.11' into cassandra-3.X

Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5439d94c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5439d94c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5439d94c

Branch: refs/heads/cassandra-3.X
Commit: 5439d94c546331b30acf0d43a503e9426364e81a
Parents: 838a21d 2f268ed
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:13:23 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:14:24 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5439d94c/CHANGES.txt
----------------------------------------------------------------------


[03/10] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e

Branch: refs/heads/cassandra-3.X
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 +++++++++++++----
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++----------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    @VisibleForTesting
+    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
+            try
+            {
+                if (flushSecondaryIndexes)
+                {
+                    indexManager.flushAllNonCFSBackedIndexesBlocking();
+                }
+            }
+            catch (Throwable e)
+            {
+                flushFailure = merge(flushFailure, e);
+            }
+            finally
+            {
+                secondaryIndexFlushLatch.countDown();
+            }
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                latch.await();
+                memtablesFlushLatch.await();
             }
             catch (InterruptedException e)
             {
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                {
-                    Collection<SSTableReader> readers = Collections.emptyList();
-                    if (!memtable.isClean() && !truncate)
-                        readers = memtable.flush();
-                    memtable.cfs.replaceFlushed(memtable, readers);
-                    reclaim(memtable);
-                }
+                    flushMemtable(memtable);
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            postFlush.memtablesFlushLatch.countDown();
+        }
+
+        public Collection<SSTableReader> flushMemtable(Memtable memtable)
+        {
+            if (memtable.isClean() || truncate)
+            {
+                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+                reclaim(memtable);
+                return Collections.emptyList();
+            }
+
+            Collection<SSTableReader> readers = Collections.emptyList();
+            try (SSTableTxnWriter writer = memtable.flush())
+            {
+                try
+                {
+                    postFlush.secondaryIndexFlushLatch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
+                }
+
+                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+                    // sstables should contain non-repaired data.
+                    readers = writer.finish(true);
+                else
+                    maybeFail(writer.abort(postFlush.flushFailure));
+            }
+
+            memtable.cfs.replaceFlushed(memtable, readers);
+            reclaim(memtable);
+            return readers;
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public Collection<SSTableReader> flush()
+    public SSTableTxnWriter flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        Collection<SSTableReader> ssTables;
-        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+        boolean trackContention = logger.isTraceEnabled();
+        int heavilyContendedRowCount = 0;
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending flush" category)
+        for (AtomicBTreePartition partition : partitions.values())
         {
-            boolean trackContention = logger.isTraceEnabled();
-            int heavilyContendedRowCount = 0;
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending flush" category)
-            for (AtomicBTreePartition partition : partitions.values())
+            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+            // we don't need to preserve tombstones for repair. So if both operation are in this
+            // memtable (which will almost always be the case if there is no ongoing failure), we can
+            // just skip the entry (CASSANDRA-4667).
+            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                continue;
+
+            if (trackContention && partition.usePessimisticLocking())
+                heavilyContendedRowCount++;
+
+            if (!partition.isEmpty())
             {
-                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                // we don't need to preserve tombstones for repair. So if both operation are in this
-                // memtable (which will almost always be the case if there is no ongoing failure), we can
-                // just skip the entry (CASSANDRA-4667).
-                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                    continue;
-
-                if (trackContention && partition.usePessimisticLocking())
-                    heavilyContendedRowCount++;
-
-                if (!partition.isEmpty())
+                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                 {
-                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                    {
-                        writer.append(iter);
-                    }
+                    writer.append(iter);
                 }
             }
+        }
 
-            if (writer.getFilePointer() > 0)
-            {
-                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                           writer.getFilename(),
-                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                           commitLogUpperBound));
-
-                // sstables should contain non-repaired data.
-                ssTables = writer.finish(true);
-            }
-            else
-            {
-                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                             writer.getFilename(), commitLogUpperBound);
-                writer.abort();
-                ssTables = Collections.emptyList();
-            }
+        if (writer.getFilePointer() > 0)
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                       writer.getFilename(),
+                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                       commitLogUpperBound));
+        else
+            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                         writer.getFilename(), commitLogUpperBound);
 
-            if (heavilyContendedRowCount > 0)
-                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        if (heavilyContendedRowCount > 0)
+            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-            return ssTables;
-        }
+        return writer;
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+
+        // Make sure there's no flush running
+        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+                TimeUnit.SECONDS.toMillis(5));
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    public void waitFor(Supplier<Boolean> condition, long timeout)
+    {
+        long start = System.currentTimeMillis();
+        while(true)
+        {
+            if (condition.get())
+                return;
+
+            assertTrue("Timeout ocurred while waiting for condition",
+                       System.currentTimeMillis() - start < timeout);
+        }
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException();
+        }
+    }
+}


[02/10] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e

Branch: refs/heads/cassandra-3.11
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 +++++++++++++----
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++----------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    @VisibleForTesting
+    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
+            try
+            {
+                if (flushSecondaryIndexes)
+                {
+                    indexManager.flushAllNonCFSBackedIndexesBlocking();
+                }
+            }
+            catch (Throwable e)
+            {
+                flushFailure = merge(flushFailure, e);
+            }
+            finally
+            {
+                secondaryIndexFlushLatch.countDown();
+            }
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                latch.await();
+                memtablesFlushLatch.await();
             }
             catch (InterruptedException e)
             {
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                {
-                    Collection<SSTableReader> readers = Collections.emptyList();
-                    if (!memtable.isClean() && !truncate)
-                        readers = memtable.flush();
-                    memtable.cfs.replaceFlushed(memtable, readers);
-                    reclaim(memtable);
-                }
+                    flushMemtable(memtable);
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            postFlush.memtablesFlushLatch.countDown();
+        }
+
+        public Collection<SSTableReader> flushMemtable(Memtable memtable)
+        {
+            if (memtable.isClean() || truncate)
+            {
+                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+                reclaim(memtable);
+                return Collections.emptyList();
+            }
+
+            Collection<SSTableReader> readers = Collections.emptyList();
+            try (SSTableTxnWriter writer = memtable.flush())
+            {
+                try
+                {
+                    postFlush.secondaryIndexFlushLatch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
+                }
+
+                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+                    // sstables should contain non-repaired data.
+                    readers = writer.finish(true);
+                else
+                    maybeFail(writer.abort(postFlush.flushFailure));
+            }
+
+            memtable.cfs.replaceFlushed(memtable, readers);
+            reclaim(memtable);
+            return readers;
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public Collection<SSTableReader> flush()
+    public SSTableTxnWriter flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        Collection<SSTableReader> ssTables;
-        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+        boolean trackContention = logger.isTraceEnabled();
+        int heavilyContendedRowCount = 0;
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending flush" category)
+        for (AtomicBTreePartition partition : partitions.values())
         {
-            boolean trackContention = logger.isTraceEnabled();
-            int heavilyContendedRowCount = 0;
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending flush" category)
-            for (AtomicBTreePartition partition : partitions.values())
+            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+            // we don't need to preserve tombstones for repair. So if both operation are in this
+            // memtable (which will almost always be the case if there is no ongoing failure), we can
+            // just skip the entry (CASSANDRA-4667).
+            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                continue;
+
+            if (trackContention && partition.usePessimisticLocking())
+                heavilyContendedRowCount++;
+
+            if (!partition.isEmpty())
             {
-                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                // we don't need to preserve tombstones for repair. So if both operation are in this
-                // memtable (which will almost always be the case if there is no ongoing failure), we can
-                // just skip the entry (CASSANDRA-4667).
-                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                    continue;
-
-                if (trackContention && partition.usePessimisticLocking())
-                    heavilyContendedRowCount++;
-
-                if (!partition.isEmpty())
+                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                 {
-                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                    {
-                        writer.append(iter);
-                    }
+                    writer.append(iter);
                 }
             }
+        }
 
-            if (writer.getFilePointer() > 0)
-            {
-                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                           writer.getFilename(),
-                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                           commitLogUpperBound));
-
-                // sstables should contain non-repaired data.
-                ssTables = writer.finish(true);
-            }
-            else
-            {
-                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                             writer.getFilename(), commitLogUpperBound);
-                writer.abort();
-                ssTables = Collections.emptyList();
-            }
+        if (writer.getFilePointer() > 0)
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                       writer.getFilename(),
+                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                       commitLogUpperBound));
+        else
+            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                         writer.getFilename(), commitLogUpperBound);
 
-            if (heavilyContendedRowCount > 0)
-                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        if (heavilyContendedRowCount > 0)
+            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-            return ssTables;
-        }
+        return writer;
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+
+        // Make sure there's no flush running
+        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+                TimeUnit.SECONDS.toMillis(5));
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    public void waitFor(Supplier<Boolean> condition, long timeout)
+    {
+        long start = System.currentTimeMillis();
+        while(true)
+        {
+            if (condition.get())
+                return;
+
+            assertTrue("Timeout ocurred while waiting for condition",
+                       System.currentTimeMillis() - start < timeout);
+        }
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException();
+        }
+    }
+}


[06/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda

Branch: refs/heads/cassandra-3.11
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
 -3.0.11
 +3.10
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
   * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
   * Nodetool should use a more sane max heap size (CASSANDRA-12739)
   * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
       * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
       * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
       */
 -    private final class PostFlush implements Callable<ReplayPosition>
 +    private final class PostFlush implements Callable<CommitLogPosition>
      {
--        final boolean flushSecondaryIndexes;
--        final OpOrder.Barrier writeBarrier;
 -        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
 -        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
 -        volatile Throwable flushFailure = null;
 +        final CountDownLatch latch = new CountDownLatch(1);
-         volatile Throwable flushFailure = null;
          final List<Memtable> memtables;
++        volatile Throwable flushFailure = null;
  
-         private PostFlush(boolean flushSecondaryIndexes,
-                           OpOrder.Barrier writeBarrier,
 -        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
--                          List<Memtable> memtables)
++        private PostFlush(List<Memtable> memtables)
          {
--            this.writeBarrier = writeBarrier;
--            this.flushSecondaryIndexes = flushSecondaryIndexes;
              this.memtables = memtables;
          }
  
 -        public ReplayPosition call()
 +        public CommitLogPosition call()
          {
--            writeBarrier.await();
--
--            /**
--             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
--             * flushed memtables and CL position, which is as good as we can guarantee.
--             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
--             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
--             */
- 
-             if (flushSecondaryIndexes)
-                 indexManager.flushAllNonCFSBackedIndexesBlocking();
 -            try
 -            {
 -                if (flushSecondaryIndexes)
 -                {
 -                    indexManager.flushAllNonCFSBackedIndexesBlocking();
 -                }
 -            }
 -            catch (Throwable e)
 -            {
 -                flushFailure = merge(flushFailure, e);
 -            }
 -            finally
 -            {
 -                secondaryIndexFlushLatch.countDown();
 -            }
--
              try
              {
                  // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
  
              // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
              // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
 -            // replay positions have also completed, i.e. the memtables are done and ready to flush
 +            // commit log segment position have also completed, i.e. the memtables are done and ready to flush
              writeBarrier.issue();
--            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++            postFlush = new PostFlush(memtables);
 +            postFlushTask = ListenableFutureTask.create(postFlush);
          }
  
          public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
  
              try
              {
--                for (Memtable memtable : memtables)
--                    flushMemtable(memtable);
++                // Flush "data" memtable with non-cf 2i first;
++                flushMemtable(memtables.get(0), true);
++                for (int i = 1; i < memtables.size(); i++)
++                    flushMemtable(memtables.get(i), false);
              }
 -            catch (Throwable e)
 +            catch (Throwable t)
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
              }
 -
              // signal the post-flush we've done our work
 -            postFlush.memtablesFlushLatch.countDown();
 +            postFlush.latch.countDown();
          }
  
--        public Collection<SSTableReader> flushMemtable(Memtable memtable)
++        public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i)
          {
              if (memtable.isClean() || truncate)
              {
@@@ -1117,93 -1086,28 +1101,102 @@@
                  return Collections.emptyList();
              }
  
 -            Collection<SSTableReader> readers = Collections.emptyList();
 -            try (SSTableTxnWriter writer = memtable.flush())
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
              {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
                  try
                  {
 -                    postFlush.secondaryIndexFlushLatch.await();
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
++                    /**
++                     * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
++                     * flushed memtables and CL position, which is as good as we can guarantee.
++                     * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
++                     * with CL as we do with memtables/CFS-backed SecondaryIndexes.
++                     */
++                    if (flushNonCf2i)
++                        indexManager.flushAllNonCFSBackedIndexesBlocking();
++
 +                    flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
                  }
 -                catch (InterruptedException e)
 +                catch (Throwable t)
                  {
 -                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
                  }
  
 -                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
 -                    // sstables should contain non-repaired data.
 -                    readers = writer.finish(true);
 -                else
 -                    maybeFail(writer.abort(postFlush.flushFailure));
 -            }
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
 +                    while (writerIterator.hasNext())
 +                    {
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
 +                        {
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
 +                        }
 +                    }
 +                }
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
 +
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
  
 -            memtable.cfs.replaceFlushed(memtable, readers);
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
 +            }
 +            memtable.cfs.replaceFlushed(memtable, sstables);
              reclaim(memtable);
 -            return readers;
 +            memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
 +            return sstables;
          }
  
          private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
          assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
      }
  
++    @Test
++    public void testFailing2iFlush() throws Throwable
++    {
++        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++        for (int i = 0; i < 10; i++)
++            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++        try
++        {
++            getCurrentColumnFamilyStore().forceBlockingFlush();
++            fail("Exception should have been propagated");
++        }
++        catch (Throwable t)
++        {
++            assertTrue(t.getMessage().contains("Broken2I"));
++        }
++
++        // SSTables remain uncommitted.
++        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++    }
++
++    // Used for index creation above
++    public static class BrokenCustom2I extends StubIndex
++    {
++        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
++        {
++            super(baseCfs, metadata);
++        }
++
++        public Callable<?> getBlockingFlushTask()
++        {
++            throw new RuntimeException("Broken2I");
++        }
++    }
++
      private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
      {
          createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",


[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk

Posted by bl...@apache.org.
Merge branch 'cassandra-3.X' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a7baa14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a7baa14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a7baa14

Branch: refs/heads/trunk
Commit: 9a7baa145398aa0b1970d70ca508f4a0a6e8e01c
Parents: 8ddbb74 5439d94
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:17:35 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:18:09 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7baa14/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 90003ec,bddd823..40407bc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -122,8 -113,9 +122,9 @@@
   * Remove pre-startup check for open JMX port (CASSANDRA-12074)
   * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
   * Restore resumable hints delivery (CASSANDRA-11960)
 - * Properly report LWT contention (CASSANDRA-12626)
 + * Properly record CAS contention (CASSANDRA-12626)
  Merged from 3.0:
+  * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
   * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
   * Nodetool should use a more sane max heap size (CASSANDRA-12739)
   * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7baa14/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------


[04/10] cassandra git commit: Make sure sstables only get committed when it's safe to discard commit log records

Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records

Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e

Branch: refs/heads/trunk
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 77 +++++++++++++----
 src/java/org/apache/cassandra/db/Memtable.java  | 81 ++++++++----------
 .../miscellaneous/ColumnFamilyStoreTest.java    | 90 ++++++++++++++++++++
 4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
  * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
  * Nodetool should use a more sane max heap size (CASSANDRA-12739)
  * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+    @VisibleForTesting
+    public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
                                                                                           StageManager.KEEPALIVE,
                                                                                           TimeUnit.SECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
-        final CountDownLatch latch = new CountDownLatch(1);
-        volatile FSWriteError flushFailure = null;
+        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+        volatile Throwable flushFailure = null;
         final List<Memtable> memtables;
 
         private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
              * with CL as we do with memtables/CFS-backed SecondaryIndexes.
              */
-
-            if (flushSecondaryIndexes)
-                indexManager.flushAllNonCFSBackedIndexesBlocking();
+            try
+            {
+                if (flushSecondaryIndexes)
+                {
+                    indexManager.flushAllNonCFSBackedIndexesBlocking();
+                }
+            }
+            catch (Throwable e)
+            {
+                flushFailure = merge(flushFailure, e);
+            }
+            finally
+            {
+                secondaryIndexFlushLatch.countDown();
+            }
 
             try
             {
                 // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
                 // on this task can rely on all prior flushes being complete
-                latch.await();
+                memtablesFlushLatch.await();
             }
             catch (InterruptedException e)
             {
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             metric.pendingFlushes.dec();
 
             if (flushFailure != null)
-                throw flushFailure;
+                Throwables.propagate(flushFailure);
 
             return commitLogUpperBound;
         }
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             try
             {
                 for (Memtable memtable : memtables)
-                {
-                    Collection<SSTableReader> readers = Collections.emptyList();
-                    if (!memtable.isClean() && !truncate)
-                        readers = memtable.flush();
-                    memtable.cfs.replaceFlushed(memtable, readers);
-                    reclaim(memtable);
-                }
+                    flushMemtable(memtable);
             }
-            catch (FSWriteError e)
+            catch (Throwable e)
             {
                 JVMStabilityInspector.inspectThrowable(e);
                 // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.latch.countDown();
+            postFlush.memtablesFlushLatch.countDown();
+        }
+
+        public Collection<SSTableReader> flushMemtable(Memtable memtable)
+        {
+            if (memtable.isClean() || truncate)
+            {
+                memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+                reclaim(memtable);
+                return Collections.emptyList();
+            }
+
+            Collection<SSTableReader> readers = Collections.emptyList();
+            try (SSTableTxnWriter writer = memtable.flush())
+            {
+                try
+                {
+                    postFlush.secondaryIndexFlushLatch.await();
+                }
+                catch (InterruptedException e)
+                {
+                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
+                }
+
+                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+                    // sstables should contain non-repaired data.
+                    readers = writer.finish(true);
+                else
+                    maybeFail(writer.abort(postFlush.flushFailure));
+            }
+
+            memtable.cfs.replaceFlushed(memtable, readers);
+            reclaim(memtable);
+            return readers;
         }
 
         private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
         return partitions.get(key);
     }
 
-    public Collection<SSTableReader> flush()
+    public SSTableTxnWriter flush()
     {
         long estimatedSize = estimatedSize();
         Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
                        * 1.2); // bloom filter and row index overhead
     }
 
-    private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+    private SSTableTxnWriter writeSortedContents(File sstableDirectory)
     {
         boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 
         logger.debug("Writing {}", Memtable.this.toString());
 
-        Collection<SSTableReader> ssTables;
-        try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+        SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+        boolean trackContention = logger.isTraceEnabled();
+        int heavilyContendedRowCount = 0;
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending flush" category)
+        for (AtomicBTreePartition partition : partitions.values())
         {
-            boolean trackContention = logger.isTraceEnabled();
-            int heavilyContendedRowCount = 0;
-            // (we can't clear out the map as-we-go to free up memory,
-            //  since the memtable is being used for queries in the "pending flush" category)
-            for (AtomicBTreePartition partition : partitions.values())
+            // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+            // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+            // we don't need to preserve tombstones for repair. So if both operation are in this
+            // memtable (which will almost always be the case if there is no ongoing failure), we can
+            // just skip the entry (CASSANDRA-4667).
+            if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+                continue;
+
+            if (trackContention && partition.usePessimisticLocking())
+                heavilyContendedRowCount++;
+
+            if (!partition.isEmpty())
             {
-                // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
-                // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
-                // we don't need to preserve tombstones for repair. So if both operation are in this
-                // memtable (which will almost always be the case if there is no ongoing failure), we can
-                // just skip the entry (CASSANDRA-4667).
-                if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
-                    continue;
-
-                if (trackContention && partition.usePessimisticLocking())
-                    heavilyContendedRowCount++;
-
-                if (!partition.isEmpty())
+                try (UnfilteredRowIterator iter = partition.unfilteredIterator())
                 {
-                    try (UnfilteredRowIterator iter = partition.unfilteredIterator())
-                    {
-                        writer.append(iter);
-                    }
+                    writer.append(iter);
                 }
             }
+        }
 
-            if (writer.getFilePointer() > 0)
-            {
-                logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
-                                           writer.getFilename(),
-                                           FBUtilities.prettyPrintMemory(writer.getFilePointer()),
-                                           commitLogUpperBound));
-
-                // sstables should contain non-repaired data.
-                ssTables = writer.finish(true);
-            }
-            else
-            {
-                logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
-                             writer.getFilename(), commitLogUpperBound);
-                writer.abort();
-                ssTables = Collections.emptyList();
-            }
+        if (writer.getFilePointer() > 0)
+            logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+                                       writer.getFilename(),
+                                       FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+                                       commitLogUpperBound));
+        else
+            logger.debug("Completed flushing {}; nothing needed to be retained.  Commitlog position was {}",
+                         writer.getFilename(), commitLogUpperBound);
 
-            if (heavilyContendedRowCount > 0)
-                logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+        if (heavilyContendedRowCount > 0)
+            logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
 
-            return ssTables;
-        }
+        return writer;
     }
 
     @SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+    @Test
+    public void testFailing2iFlush() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+        try
+        {
+            getCurrentColumnFamilyStore().forceBlockingFlush();
+        }
+        catch (Throwable t)
+        {
+            // ignore
+        }
+
+        // Make sure there's no flush running
+        waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+                TimeUnit.SECONDS.toMillis(5));
+
+        // SSTables remain uncommitted.
+        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+    }
+
+    public void waitFor(Supplier<Boolean> condition, long timeout)
+    {
+        long start = System.currentTimeMillis();
+        while(true)
+        {
+            if (condition.get())
+                return;
+
+            assertTrue("Timeout ocurred while waiting for condition",
+                       System.currentTimeMillis() - start < timeout);
+        }
+    }
+
+    // Used for index creation above
+    public static class BrokenCustom2I extends StubIndex
+    {
+        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+        }
+
+        public Callable<?> getBlockingFlushTask()
+        {
+            throw new RuntimeException();
+        }
+    }
+}


[07/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda

Branch: refs/heads/trunk
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
 -3.0.11
 +3.10
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
   * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
   * Nodetool should use a more sane max heap size (CASSANDRA-12739)
   * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
       * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
       * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
       */
 -    private final class PostFlush implements Callable<ReplayPosition>
 +    private final class PostFlush implements Callable<CommitLogPosition>
      {
--        final boolean flushSecondaryIndexes;
--        final OpOrder.Barrier writeBarrier;
 -        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
 -        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
 -        volatile Throwable flushFailure = null;
 +        final CountDownLatch latch = new CountDownLatch(1);
-         volatile Throwable flushFailure = null;
          final List<Memtable> memtables;
++        volatile Throwable flushFailure = null;
  
-         private PostFlush(boolean flushSecondaryIndexes,
-                           OpOrder.Barrier writeBarrier,
 -        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
--                          List<Memtable> memtables)
++        private PostFlush(List<Memtable> memtables)
          {
--            this.writeBarrier = writeBarrier;
--            this.flushSecondaryIndexes = flushSecondaryIndexes;
              this.memtables = memtables;
          }
  
 -        public ReplayPosition call()
 +        public CommitLogPosition call()
          {
--            writeBarrier.await();
--
--            /**
--             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
--             * flushed memtables and CL position, which is as good as we can guarantee.
--             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
--             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
--             */
- 
-             if (flushSecondaryIndexes)
-                 indexManager.flushAllNonCFSBackedIndexesBlocking();
 -            try
 -            {
 -                if (flushSecondaryIndexes)
 -                {
 -                    indexManager.flushAllNonCFSBackedIndexesBlocking();
 -                }
 -            }
 -            catch (Throwable e)
 -            {
 -                flushFailure = merge(flushFailure, e);
 -            }
 -            finally
 -            {
 -                secondaryIndexFlushLatch.countDown();
 -            }
--
              try
              {
                  // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
  
              // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
              // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
 -            // replay positions have also completed, i.e. the memtables are done and ready to flush
 +            // commit log segment position have also completed, i.e. the memtables are done and ready to flush
              writeBarrier.issue();
--            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++            postFlush = new PostFlush(memtables);
 +            postFlushTask = ListenableFutureTask.create(postFlush);
          }
  
          public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
  
              try
              {
--                for (Memtable memtable : memtables)
--                    flushMemtable(memtable);
++                // Flush "data" memtable with non-cf 2i first;
++                flushMemtable(memtables.get(0), true);
++                for (int i = 1; i < memtables.size(); i++)
++                    flushMemtable(memtables.get(i), false);
              }
 -            catch (Throwable e)
 +            catch (Throwable t)
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
              }
 -
              // signal the post-flush we've done our work
 -            postFlush.memtablesFlushLatch.countDown();
 +            postFlush.latch.countDown();
          }
  
--        public Collection<SSTableReader> flushMemtable(Memtable memtable)
++        public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i)
          {
              if (memtable.isClean() || truncate)
              {
@@@ -1117,93 -1086,28 +1101,102 @@@
                  return Collections.emptyList();
              }
  
 -            Collection<SSTableReader> readers = Collections.emptyList();
 -            try (SSTableTxnWriter writer = memtable.flush())
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
              {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
                  try
                  {
 -                    postFlush.secondaryIndexFlushLatch.await();
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
++                    /**
++                     * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
++                     * flushed memtables and CL position, which is as good as we can guarantee.
++                     * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
++                     * with CL as we do with memtables/CFS-backed SecondaryIndexes.
++                     */
++                    if (flushNonCf2i)
++                        indexManager.flushAllNonCFSBackedIndexesBlocking();
++
 +                    flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
                  }
 -                catch (InterruptedException e)
 +                catch (Throwable t)
                  {
 -                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
                  }
  
 -                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
 -                    // sstables should contain non-repaired data.
 -                    readers = writer.finish(true);
 -                else
 -                    maybeFail(writer.abort(postFlush.flushFailure));
 -            }
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
 +                    while (writerIterator.hasNext())
 +                    {
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
 +                        {
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
 +                        }
 +                    }
 +                }
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
 +
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
  
 -            memtable.cfs.replaceFlushed(memtable, readers);
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
 +            }
 +            memtable.cfs.replaceFlushed(memtable, sstables);
              reclaim(memtable);
 -            return readers;
 +            memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
 +            return sstables;
          }
  
          private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
          assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
      }
  
++    @Test
++    public void testFailing2iFlush() throws Throwable
++    {
++        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++        for (int i = 0; i < 10; i++)
++            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++        try
++        {
++            getCurrentColumnFamilyStore().forceBlockingFlush();
++            fail("Exception should have been propagated");
++        }
++        catch (Throwable t)
++        {
++            assertTrue(t.getMessage().contains("Broken2I"));
++        }
++
++        // SSTables remain uncommitted.
++        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++    }
++
++    // Used for index creation above
++    public static class BrokenCustom2I extends StubIndex
++    {
++        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
++        {
++            super(baseCfs, metadata);
++        }
++
++        public Callable<?> getBlockingFlushTask()
++        {
++            throw new RuntimeException("Broken2I");
++        }
++    }
++
      private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
      {
          createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",


[05/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda

Branch: refs/heads/cassandra-3.X
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
 -3.0.11
 +3.10
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
   * Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
   * Nodetool should use a more sane max heap size (CASSANDRA-12739)
   * LocalToken ensures token values are cloned on heap (CASSANDRA-12651)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
       * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
       * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
       */
 -    private final class PostFlush implements Callable<ReplayPosition>
 +    private final class PostFlush implements Callable<CommitLogPosition>
      {
--        final boolean flushSecondaryIndexes;
--        final OpOrder.Barrier writeBarrier;
 -        final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
 -        final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
 -        volatile Throwable flushFailure = null;
 +        final CountDownLatch latch = new CountDownLatch(1);
-         volatile Throwable flushFailure = null;
          final List<Memtable> memtables;
++        volatile Throwable flushFailure = null;
  
-         private PostFlush(boolean flushSecondaryIndexes,
-                           OpOrder.Barrier writeBarrier,
 -        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
--                          List<Memtable> memtables)
++        private PostFlush(List<Memtable> memtables)
          {
--            this.writeBarrier = writeBarrier;
--            this.flushSecondaryIndexes = flushSecondaryIndexes;
              this.memtables = memtables;
          }
  
 -        public ReplayPosition call()
 +        public CommitLogPosition call()
          {
--            writeBarrier.await();
--
--            /**
--             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
--             * flushed memtables and CL position, which is as good as we can guarantee.
--             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
--             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
--             */
- 
-             if (flushSecondaryIndexes)
-                 indexManager.flushAllNonCFSBackedIndexesBlocking();
 -            try
 -            {
 -                if (flushSecondaryIndexes)
 -                {
 -                    indexManager.flushAllNonCFSBackedIndexesBlocking();
 -                }
 -            }
 -            catch (Throwable e)
 -            {
 -                flushFailure = merge(flushFailure, e);
 -            }
 -            finally
 -            {
 -                secondaryIndexFlushLatch.countDown();
 -            }
--
              try
              {
                  // we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
  
              // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
              // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
 -            // replay positions have also completed, i.e. the memtables are done and ready to flush
 +            // commit log segment position have also completed, i.e. the memtables are done and ready to flush
              writeBarrier.issue();
--            postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++            postFlush = new PostFlush(memtables);
 +            postFlushTask = ListenableFutureTask.create(postFlush);
          }
  
          public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
  
              try
              {
--                for (Memtable memtable : memtables)
--                    flushMemtable(memtable);
++                // Flush "data" memtable with non-cf 2i first;
++                flushMemtable(memtables.get(0), true);
++                for (int i = 1; i < memtables.size(); i++)
++                    flushMemtable(memtables.get(i), false);
              }
 -            catch (Throwable e)
 +            catch (Throwable t)
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
              }
 -
              // signal the post-flush we've done our work
 -            postFlush.memtablesFlushLatch.countDown();
 +            postFlush.latch.countDown();
          }
  
--        public Collection<SSTableReader> flushMemtable(Memtable memtable)
++        public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i)
          {
              if (memtable.isClean() || truncate)
              {
@@@ -1117,93 -1086,28 +1101,102 @@@
                  return Collections.emptyList();
              }
  
 -            Collection<SSTableReader> readers = Collections.emptyList();
 -            try (SSTableTxnWriter writer = memtable.flush())
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
              {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
                  try
                  {
 -                    postFlush.secondaryIndexFlushLatch.await();
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
++                    /**
++                     * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
++                     * flushed memtables and CL position, which is as good as we can guarantee.
++                     * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
++                     * with CL as we do with memtables/CFS-backed SecondaryIndexes.
++                     */
++                    if (flushNonCf2i)
++                        indexManager.flushAllNonCFSBackedIndexesBlocking();
++
 +                    flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
                  }
 -                catch (InterruptedException e)
 +                catch (Throwable t)
                  {
 -                    postFlush.flushFailure = merge(postFlush.flushFailure, e);
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
                  }
  
 -                if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
 -                    // sstables should contain non-repaired data.
 -                    readers = writer.finish(true);
 -                else
 -                    maybeFail(writer.abort(postFlush.flushFailure));
 -            }
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
 +                    while (writerIterator.hasNext())
 +                    {
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
 +                        {
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
 +                        }
 +                    }
 +                }
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
 +
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
  
 -            memtable.cfs.replaceFlushed(memtable, readers);
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
 +            }
 +            memtable.cfs.replaceFlushed(memtable, sstables);
              reclaim(memtable);
 -            return readers;
 +            memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
 +            return sstables;
          }
  
          private void reclaim(final Memtable memtable)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
          assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
      }
  
++    @Test
++    public void testFailing2iFlush() throws Throwable
++    {
++        createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++        createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++        for (int i = 0; i < 10; i++)
++            execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++        try
++        {
++            getCurrentColumnFamilyStore().forceBlockingFlush();
++            fail("Exception should have been propagated");
++        }
++        catch (Throwable t)
++        {
++            assertTrue(t.getMessage().contains("Broken2I"));
++        }
++
++        // SSTables remain uncommitted.
++        assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++    }
++
++    // Used for index creation above
++    public static class BrokenCustom2I extends StubIndex
++    {
++        public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
++        {
++            super(baseCfs, metadata);
++        }
++
++        public Callable<?> getBlockingFlushTask()
++        {
++            throw new RuntimeException("Broken2I");
++        }
++    }
++
      private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
      {
          createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",


[09/10] cassandra git commit: Merge branch 'cassandra-3.11' into cassandra-3.X

Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5439d94c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5439d94c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5439d94c

Branch: refs/heads/trunk
Commit: 5439d94c546331b30acf0d43a503e9426364e81a
Parents: 838a21d 2f268ed
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:13:23 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:14:24 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 41 ++++++++------------
 .../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
 3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5439d94c/CHANGES.txt
----------------------------------------------------------------------