You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/06 11:21:45 UTC
[01/10] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 5f64ed7cc -> 6f90e55e7
refs/heads/cassandra-3.11 b207f2e3b -> 2f268eda3
refs/heads/cassandra-3.X 838a21d40 -> 5439d94c5
refs/heads/trunk 8ddbb7493 -> 9a7baa145
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e
Branch: refs/heads/cassandra-3.0
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 77 +++++++++++++----
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++----------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 ++++++++++++++++++++
4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ @VisibleForTesting
+ public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+ final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
+ try
+ {
+ if (flushSecondaryIndexes)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ }
+ }
+ catch (Throwable e)
+ {
+ flushFailure = merge(flushFailure, e);
+ }
+ finally
+ {
+ secondaryIndexFlushLatch.countDown();
+ }
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- latch.await();
+ memtablesFlushLatch.await();
}
catch (InterruptedException e)
{
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- {
- Collection<SSTableReader> readers = Collections.emptyList();
- if (!memtable.isClean() && !truncate)
- readers = memtable.flush();
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- }
+ flushMemtable(memtable);
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.latch.countDown();
+ postFlush.memtablesFlushLatch.countDown();
+ }
+
+ public Collection<SSTableReader> flushMemtable(Memtable memtable)
+ {
+ if (memtable.isClean() || truncate)
+ {
+ memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+ reclaim(memtable);
+ return Collections.emptyList();
+ }
+
+ Collection<SSTableReader> readers = Collections.emptyList();
+ try (SSTableTxnWriter writer = memtable.flush())
+ {
+ try
+ {
+ postFlush.secondaryIndexFlushLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ }
+
+ if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+ // sstables should contain non-repaired data.
+ readers = writer.finish(true);
+ else
+ maybeFail(writer.abort(postFlush.flushFailure));
+ }
+
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ return readers;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public Collection<SSTableReader> flush()
+ public SSTableTxnWriter flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+ private SSTableTxnWriter writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
+ writer.append(iter);
}
}
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
-
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
- writer.abort();
- ssTables = Collections.emptyList();
- }
+ if (writer.getFilePointer() > 0)
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
+ else
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTables;
- }
+ return writer;
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+
+ // Make sure there's no flush running
+ waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+ TimeUnit.SECONDS.toMillis(5));
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ public void waitFor(Supplier<Boolean> condition, long timeout)
+ {
+ long start = System.currentTimeMillis();
+ while(true)
+ {
+ if (condition.get())
+ return;
+
+ assertTrue("Timeout ocurred while waiting for condition",
+ System.currentTimeMillis() - start < timeout);
+ }
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException();
+ }
+ }
+}
[08/10] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5439d94c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5439d94c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5439d94c
Branch: refs/heads/cassandra-3.X
Commit: 5439d94c546331b30acf0d43a503e9426364e81a
Parents: 838a21d 2f268ed
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:13:23 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:14:24 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5439d94c/CHANGES.txt
----------------------------------------------------------------------
[03/10] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e
Branch: refs/heads/cassandra-3.X
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 77 +++++++++++++----
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++----------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 ++++++++++++++++++++
4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ @VisibleForTesting
+ public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+ final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
+ try
+ {
+ if (flushSecondaryIndexes)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ }
+ }
+ catch (Throwable e)
+ {
+ flushFailure = merge(flushFailure, e);
+ }
+ finally
+ {
+ secondaryIndexFlushLatch.countDown();
+ }
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- latch.await();
+ memtablesFlushLatch.await();
}
catch (InterruptedException e)
{
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- {
- Collection<SSTableReader> readers = Collections.emptyList();
- if (!memtable.isClean() && !truncate)
- readers = memtable.flush();
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- }
+ flushMemtable(memtable);
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.latch.countDown();
+ postFlush.memtablesFlushLatch.countDown();
+ }
+
+ public Collection<SSTableReader> flushMemtable(Memtable memtable)
+ {
+ if (memtable.isClean() || truncate)
+ {
+ memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+ reclaim(memtable);
+ return Collections.emptyList();
+ }
+
+ Collection<SSTableReader> readers = Collections.emptyList();
+ try (SSTableTxnWriter writer = memtable.flush())
+ {
+ try
+ {
+ postFlush.secondaryIndexFlushLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ }
+
+ if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+ // sstables should contain non-repaired data.
+ readers = writer.finish(true);
+ else
+ maybeFail(writer.abort(postFlush.flushFailure));
+ }
+
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ return readers;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public Collection<SSTableReader> flush()
+ public SSTableTxnWriter flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+ private SSTableTxnWriter writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
+ writer.append(iter);
}
}
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
-
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
- writer.abort();
- ssTables = Collections.emptyList();
- }
+ if (writer.getFilePointer() > 0)
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
+ else
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTables;
- }
+ return writer;
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+
+ // Make sure there's no flush running
+ waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+ TimeUnit.SECONDS.toMillis(5));
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ public void waitFor(Supplier<Boolean> condition, long timeout)
+ {
+ long start = System.currentTimeMillis();
+ while(true)
+ {
+ if (condition.get())
+ return;
+
+ assertTrue("Timeout ocurred while waiting for condition",
+ System.currentTimeMillis() - start < timeout);
+ }
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException();
+ }
+ }
+}
[02/10] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e
Branch: refs/heads/cassandra-3.11
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 77 +++++++++++++----
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++----------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 ++++++++++++++++++++
4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ @VisibleForTesting
+ public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+ final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
+ try
+ {
+ if (flushSecondaryIndexes)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ }
+ }
+ catch (Throwable e)
+ {
+ flushFailure = merge(flushFailure, e);
+ }
+ finally
+ {
+ secondaryIndexFlushLatch.countDown();
+ }
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- latch.await();
+ memtablesFlushLatch.await();
}
catch (InterruptedException e)
{
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- {
- Collection<SSTableReader> readers = Collections.emptyList();
- if (!memtable.isClean() && !truncate)
- readers = memtable.flush();
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- }
+ flushMemtable(memtable);
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.latch.countDown();
+ postFlush.memtablesFlushLatch.countDown();
+ }
+
+ public Collection<SSTableReader> flushMemtable(Memtable memtable)
+ {
+ if (memtable.isClean() || truncate)
+ {
+ memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+ reclaim(memtable);
+ return Collections.emptyList();
+ }
+
+ Collection<SSTableReader> readers = Collections.emptyList();
+ try (SSTableTxnWriter writer = memtable.flush())
+ {
+ try
+ {
+ postFlush.secondaryIndexFlushLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ }
+
+ if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+ // sstables should contain non-repaired data.
+ readers = writer.finish(true);
+ else
+ maybeFail(writer.abort(postFlush.flushFailure));
+ }
+
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ return readers;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public Collection<SSTableReader> flush()
+ public SSTableTxnWriter flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+ private SSTableTxnWriter writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
+ writer.append(iter);
}
}
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
-
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
- writer.abort();
- ssTables = Collections.emptyList();
- }
+ if (writer.getFilePointer() > 0)
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
+ else
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTables;
- }
+ return writer;
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+
+ // Make sure there's no flush running
+ waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+ TimeUnit.SECONDS.toMillis(5));
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ public void waitFor(Supplier<Boolean> condition, long timeout)
+ {
+ long start = System.currentTimeMillis();
+ while(true)
+ {
+ if (condition.get())
+ return;
+
+ assertTrue("Timeout ocurred while waiting for condition",
+ System.currentTimeMillis() - start < timeout);
+ }
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException();
+ }
+ }
+}
[06/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda
Branch: refs/heads/cassandra-3.11
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
-3.0.11
+3.10
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
* Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
* etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
*/
- private final class PostFlush implements Callable<ReplayPosition>
+ private final class PostFlush implements Callable<CommitLogPosition>
{
-- final boolean flushSecondaryIndexes;
-- final OpOrder.Barrier writeBarrier;
- final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
- final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
+ final CountDownLatch latch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
final List<Memtable> memtables;
++ volatile Throwable flushFailure = null;
- private PostFlush(boolean flushSecondaryIndexes,
- OpOrder.Barrier writeBarrier,
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-- List<Memtable> memtables)
++ private PostFlush(List<Memtable> memtables)
{
-- this.writeBarrier = writeBarrier;
-- this.flushSecondaryIndexes = flushSecondaryIndexes;
this.memtables = memtables;
}
- public ReplayPosition call()
+ public CommitLogPosition call()
{
-- writeBarrier.await();
--
-- /**
-- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-- * flushed memtables and CL position, which is as good as we can guarantee.
-- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-- * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-- */
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- try
- {
- if (flushSecondaryIndexes)
- {
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- }
- }
- catch (Throwable e)
- {
- flushFailure = merge(flushFailure, e);
- }
- finally
- {
- secondaryIndexFlushLatch.countDown();
- }
--
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
- // replay positions have also completed, i.e. the memtables are done and ready to flush
+ // commit log segment position have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
-- postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++ postFlush = new PostFlush(memtables);
+ postFlushTask = ListenableFutureTask.create(postFlush);
}
public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
try
{
-- for (Memtable memtable : memtables)
-- flushMemtable(memtable);
++ // Flush "data" memtable with non-cf 2i first;
++ flushMemtable(memtables.get(0), true);
++ for (int i = 1; i < memtables.size(); i++)
++ flushMemtable(memtables.get(i), false);
}
- catch (Throwable e)
+ catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(e);
- // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
- postFlush.flushFailure = e;
+ JVMStabilityInspector.inspectThrowable(t);
+ postFlush.flushFailure = t;
}
-
// signal the post-flush we've done our work
- postFlush.memtablesFlushLatch.countDown();
+ postFlush.latch.countDown();
}
-- public Collection<SSTableReader> flushMemtable(Memtable memtable)
++ public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i)
{
if (memtable.isClean() || truncate)
{
@@@ -1117,93 -1086,28 +1101,102 @@@
return Collections.emptyList();
}
- Collection<SSTableReader> readers = Collections.emptyList();
- try (SSTableTxnWriter writer = memtable.flush())
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
{
+ List<Memtable.FlushRunnable> flushRunnables = null;
+ List<SSTableMultiWriter> flushResults = null;
+
try
{
- postFlush.secondaryIndexFlushLatch.await();
+ // flush the memtable
+ flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
++ /**
++ * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
++ * flushed memtables and CL position, which is as good as we can guarantee.
++ * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
++ * with CL as we do with memtables/CFS-backed SecondaryIndexes.
++ */
++ if (flushNonCf2i)
++ indexManager.flushAllNonCFSBackedIndexesBlocking();
++
+ flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
}
- catch (InterruptedException e)
+ catch (Throwable t)
{
- postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ t = memtable.abortRunnables(flushRunnables, t);
+ t = txn.abort(t);
+ throw Throwables.propagate(t);
}
- if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
- // sstables should contain non-repaired data.
- readers = writer.finish(true);
- else
- maybeFail(writer.abort(postFlush.flushFailure));
- }
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ @SuppressWarnings("resource")
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
- memtable.cfs.replaceFlushed(memtable, readers);
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
+ memtable.cfs.replaceFlushed(memtable, sstables);
reclaim(memtable);
- return readers;
+ memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+ logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+ sstables,
+ sstables.size(),
+ FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+ FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+ FBUtilities.prettyPrintMemory(minBytesOnDisk));
+ return sstables;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
}
++ @Test
++ public void testFailing2iFlush() throws Throwable
++ {
++ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++ for (int i = 0; i < 10; i++)
++ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++ try
++ {
++ getCurrentColumnFamilyStore().forceBlockingFlush();
++ fail("Exception should have been propagated");
++ }
++ catch (Throwable t)
++ {
++ assertTrue(t.getMessage().contains("Broken2I"));
++ }
++
++ // SSTables remain uncommitted.
++ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++ }
++
++ // Used for index creation above
++ public static class BrokenCustom2I extends StubIndex
++ {
++ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
++ {
++ super(baseCfs, metadata);
++ }
++
++ public Callable<?> getBlockingFlushTask()
++ {
++ throw new RuntimeException("Broken2I");
++ }
++ }
++
private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
{
createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk
Posted by bl...@apache.org.
Merge branch 'cassandra-3.X' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a7baa14
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a7baa14
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a7baa14
Branch: refs/heads/trunk
Commit: 9a7baa145398aa0b1970d70ca508f4a0a6e8e01c
Parents: 8ddbb74 5439d94
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:17:35 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:18:09 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7baa14/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 90003ec,bddd823..40407bc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -122,8 -113,9 +122,9 @@@
* Remove pre-startup check for open JMX port (CASSANDRA-12074)
* Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
* Restore resumable hints delivery (CASSANDRA-11960)
- * Properly report LWT contention (CASSANDRA-12626)
+ * Properly record CAS contention (CASSANDRA-12626)
Merged from 3.0:
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a7baa14/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
[04/10] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f90e55e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f90e55e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f90e55e
Branch: refs/heads/trunk
Commit: 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537
Parents: 5f64ed7
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:10:31 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 77 +++++++++++++----
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++----------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 ++++++++++++++++++++
4 files changed, 186 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..113e10d 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -81,6 +82,7 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -124,7 +126,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ @VisibleForTesting
+ public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -921,8 +924,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
+ final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -943,15 +947,27 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
+ try
+ {
+ if (flushSecondaryIndexes)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ }
+ }
+ catch (Throwable e)
+ {
+ flushFailure = merge(flushFailure, e);
+ }
+ finally
+ {
+ secondaryIndexFlushLatch.countDown();
+ }
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- latch.await();
+ memtablesFlushLatch.await();
}
catch (InterruptedException e)
{
@@ -970,7 +986,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1048,15 +1064,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- {
- Collection<SSTableReader> readers = Collections.emptyList();
- if (!memtable.isClean() && !truncate)
- readers = memtable.flush();
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- }
+ flushMemtable(memtable);
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1064,7 +1074,40 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.latch.countDown();
+ postFlush.memtablesFlushLatch.countDown();
+ }
+
+ public Collection<SSTableReader> flushMemtable(Memtable memtable)
+ {
+ if (memtable.isClean() || truncate)
+ {
+ memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
+ reclaim(memtable);
+ return Collections.emptyList();
+ }
+
+ Collection<SSTableReader> readers = Collections.emptyList();
+ try (SSTableTxnWriter writer = memtable.flush())
+ {
+ try
+ {
+ postFlush.secondaryIndexFlushLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ }
+
+ if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
+ // sstables should contain non-repaired data.
+ readers = writer.finish(true);
+ else
+ maybeFail(writer.abort(postFlush.flushFailure));
+ }
+
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ return readers;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1a7d6cb..6404b37 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,6 +48,7 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -317,7 +318,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public Collection<SSTableReader> flush()
+ public SSTableTxnWriter flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -357,64 +358,52 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
+ private SSTableTxnWriter writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- Collection<SSTableReader> ssTables;
- try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+ SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
- {
- writer.append(iter);
- }
+ writer.append(iter);
}
}
+ }
- if (writer.getFilePointer() > 0)
- {
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
-
- // sstables should contain non-repaired data.
- ssTables = writer.finish(true);
- }
- else
- {
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
- writer.abort();
- ssTables = Collections.emptyList();
- }
+ if (writer.getFilePointer() > 0)
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
+ else
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
- return ssTables;
- }
+ return writer;
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f90e55e/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
new file mode 100644
index 0000000..1285392
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3.validation.miscellaneous;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.index.StubIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ColumnFamilyStoreTest extends CQLTester
+{
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+
+ // Make sure there's no flush running
+ waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
+ TimeUnit.SECONDS.toMillis(5));
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ public void waitFor(Supplier<Boolean> condition, long timeout)
+ {
+ long start = System.currentTimeMillis();
+ while(true)
+ {
+ if (condition.get())
+ return;
+
+ assertTrue("Timeout ocurred while waiting for condition",
+ System.currentTimeMillis() - start < timeout);
+ }
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException();
+ }
+ }
+}
[07/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda
Branch: refs/heads/trunk
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
-3.0.11
+3.10
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
* Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
* etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
*/
- private final class PostFlush implements Callable<ReplayPosition>
+ private final class PostFlush implements Callable<CommitLogPosition>
{
-- final boolean flushSecondaryIndexes;
-- final OpOrder.Barrier writeBarrier;
- final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
- final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
+ final CountDownLatch latch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
final List<Memtable> memtables;
++ volatile Throwable flushFailure = null;
- private PostFlush(boolean flushSecondaryIndexes,
- OpOrder.Barrier writeBarrier,
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-- List<Memtable> memtables)
++ private PostFlush(List<Memtable> memtables)
{
-- this.writeBarrier = writeBarrier;
-- this.flushSecondaryIndexes = flushSecondaryIndexes;
this.memtables = memtables;
}
- public ReplayPosition call()
+ public CommitLogPosition call()
{
-- writeBarrier.await();
--
-- /**
-- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-- * flushed memtables and CL position, which is as good as we can guarantee.
-- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-- * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-- */
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- try
- {
- if (flushSecondaryIndexes)
- {
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- }
- }
- catch (Throwable e)
- {
- flushFailure = merge(flushFailure, e);
- }
- finally
- {
- secondaryIndexFlushLatch.countDown();
- }
--
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
- // replay positions have also completed, i.e. the memtables are done and ready to flush
+ // commit log segment position have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
-- postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++ postFlush = new PostFlush(memtables);
+ postFlushTask = ListenableFutureTask.create(postFlush);
}
public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
try
{
-- for (Memtable memtable : memtables)
-- flushMemtable(memtable);
++ // Flush "data" memtable with non-cf 2i first;
++ flushMemtable(memtables.get(0), true);
++ for (int i = 1; i < memtables.size(); i++)
++ flushMemtable(memtables.get(i), false);
}
- catch (Throwable e)
+ catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(e);
- // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
- postFlush.flushFailure = e;
+ JVMStabilityInspector.inspectThrowable(t);
+ postFlush.flushFailure = t;
}
-
// signal the post-flush we've done our work
- postFlush.memtablesFlushLatch.countDown();
+ postFlush.latch.countDown();
}
-- public Collection<SSTableReader> flushMemtable(Memtable memtable)
++ public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i)
{
if (memtable.isClean() || truncate)
{
@@@ -1117,93 -1086,28 +1101,102 @@@
return Collections.emptyList();
}
- Collection<SSTableReader> readers = Collections.emptyList();
- try (SSTableTxnWriter writer = memtable.flush())
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
{
+ List<Memtable.FlushRunnable> flushRunnables = null;
+ List<SSTableMultiWriter> flushResults = null;
+
try
{
- postFlush.secondaryIndexFlushLatch.await();
+ // flush the memtable
+ flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
++ /**
++ * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
++ * flushed memtables and CL position, which is as good as we can guarantee.
++ * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
++ * with CL as we do with memtables/CFS-backed SecondaryIndexes.
++ */
++ if (flushNonCf2i)
++ indexManager.flushAllNonCFSBackedIndexesBlocking();
++
+ flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
}
- catch (InterruptedException e)
+ catch (Throwable t)
{
- postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ t = memtable.abortRunnables(flushRunnables, t);
+ t = txn.abort(t);
+ throw Throwables.propagate(t);
}
- if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
- // sstables should contain non-repaired data.
- readers = writer.finish(true);
- else
- maybeFail(writer.abort(postFlush.flushFailure));
- }
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ @SuppressWarnings("resource")
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
- memtable.cfs.replaceFlushed(memtable, readers);
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
+ memtable.cfs.replaceFlushed(memtable, sstables);
reclaim(memtable);
- return readers;
+ memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+ logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+ sstables,
+ sstables.size(),
+ FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+ FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+ FBUtilities.prettyPrintMemory(minBytesOnDisk));
+ return sstables;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
}
++ @Test
++ public void testFailing2iFlush() throws Throwable
++ {
++ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++ for (int i = 0; i < 10; i++)
++ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++ try
++ {
++ getCurrentColumnFamilyStore().forceBlockingFlush();
++ fail("Exception should have been propagated");
++ }
++ catch (Throwable t)
++ {
++ assertTrue(t.getMessage().contains("Broken2I"));
++ }
++
++ // SSTables remain uncommitted.
++ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++ }
++
++ // Used for index creation above
++ public static class BrokenCustom2I extends StubIndex
++ {
++ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
++ {
++ super(baseCfs, metadata);
++ }
++
++ public Callable<?> getBlockingFlushTask()
++ {
++ throw new RuntimeException("Broken2I");
++ }
++ }
++
private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
{
createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
[05/10] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f268eda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f268eda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f268eda
Branch: refs/heads/cassandra-3.X
Commit: 2f268eda3d44f8b14b71b7f4b3f4c25e2dfb2c11
Parents: b207f2e 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:11:15 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:12:19 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c5d2da2,5242adf..6da6b4f
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,113 -1,5 +1,114 @@@
-3.0.11
+3.10
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f46e6f7,113e10d..881fb00
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -963,37 -920,49 +963,19 @@@ public class ColumnFamilyStore implemen
* Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
* etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
*/
- private final class PostFlush implements Callable<ReplayPosition>
+ private final class PostFlush implements Callable<CommitLogPosition>
{
-- final boolean flushSecondaryIndexes;
-- final OpOrder.Barrier writeBarrier;
- final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
- final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
+ final CountDownLatch latch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
final List<Memtable> memtables;
++ volatile Throwable flushFailure = null;
- private PostFlush(boolean flushSecondaryIndexes,
- OpOrder.Barrier writeBarrier,
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
-- List<Memtable> memtables)
++ private PostFlush(List<Memtable> memtables)
{
-- this.writeBarrier = writeBarrier;
-- this.flushSecondaryIndexes = flushSecondaryIndexes;
this.memtables = memtables;
}
- public ReplayPosition call()
+ public CommitLogPosition call()
{
-- writeBarrier.await();
--
-- /**
-- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
-- * flushed memtables and CL position, which is as good as we can guarantee.
-- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
-- * with CL as we do with memtables/CFS-backed SecondaryIndexes.
-- */
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- try
- {
- if (flushSecondaryIndexes)
- {
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- }
- }
- catch (Throwable e)
- {
- flushFailure = merge(flushFailure, e);
- }
- finally
- {
- secondaryIndexFlushLatch.countDown();
- }
--
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@@ -1075,10 -1043,9 +1057,10 @@@
// we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete;
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
- // replay positions have also completed, i.e. the memtables are done and ready to flush
+ // commit log segment position have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
-- postFlush = new PostFlush(!truncate, writeBarrier, memtables);
++ postFlush = new PostFlush(memtables);
+ postFlushTask = ListenableFutureTask.create(postFlush);
}
public void run()
@@@ -1096,19 -1063,21 +1078,21 @@@
try
{
-- for (Memtable memtable : memtables)
-- flushMemtable(memtable);
++ // Flush "data" memtable with non-cf 2i first;
++ flushMemtable(memtables.get(0), true);
++ for (int i = 1; i < memtables.size(); i++)
++ flushMemtable(memtables.get(i), false);
}
- catch (Throwable e)
+ catch (Throwable t)
{
- JVMStabilityInspector.inspectThrowable(e);
- // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
- postFlush.flushFailure = e;
+ JVMStabilityInspector.inspectThrowable(t);
+ postFlush.flushFailure = t;
}
-
// signal the post-flush we've done our work
- postFlush.memtablesFlushLatch.countDown();
+ postFlush.latch.countDown();
}
-- public Collection<SSTableReader> flushMemtable(Memtable memtable)
++ public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i)
{
if (memtable.isClean() || truncate)
{
@@@ -1117,93 -1086,28 +1101,102 @@@
return Collections.emptyList();
}
- Collection<SSTableReader> readers = Collections.emptyList();
- try (SSTableTxnWriter writer = memtable.flush())
+ List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
+ long totalBytesOnDisk = 0;
+ long maxBytesOnDisk = 0;
+ long minBytesOnDisk = Long.MAX_VALUE;
+ List<SSTableReader> sstables = new ArrayList<>();
+ try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH))
{
+ List<Memtable.FlushRunnable> flushRunnables = null;
+ List<SSTableMultiWriter> flushResults = null;
+
try
{
- postFlush.secondaryIndexFlushLatch.await();
+ // flush the memtable
+ flushRunnables = memtable.flushRunnables(txn);
+
+ for (int i = 0; i < flushRunnables.size(); i++)
+ futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
+
++ /**
++ * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
++ * flushed memtables and CL position, which is as good as we can guarantee.
++ * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
++ * with CL as we do with memtables/CFS-backed SecondaryIndexes.
++ */
++ if (flushNonCf2i)
++ indexManager.flushAllNonCFSBackedIndexesBlocking();
++
+ flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures));
}
- catch (InterruptedException e)
+ catch (Throwable t)
{
- postFlush.flushFailure = merge(postFlush.flushFailure, e);
+ t = memtable.abortRunnables(flushRunnables, t);
+ t = txn.abort(t);
+ throw Throwables.propagate(t);
}
- if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
- // sstables should contain non-repaired data.
- readers = writer.finish(true);
- else
- maybeFail(writer.abort(postFlush.flushFailure));
- }
+ try
+ {
+ Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator();
+ while (writerIterator.hasNext())
+ {
+ @SuppressWarnings("resource")
+ SSTableMultiWriter writer = writerIterator.next();
+ if (writer.getFilePointer() > 0)
+ {
+ writer.setOpenResult(true).prepareToCommit();
+ }
+ else
+ {
+ maybeFail(writer.abort(null));
+ writerIterator.remove();
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ for (SSTableMultiWriter writer : flushResults)
+ t = writer.abort(t);
+ t = txn.abort(t);
+ Throwables.propagate(t);
+ }
+
+ txn.prepareToCommit();
+
+ Throwable accumulate = null;
+ for (SSTableMultiWriter writer : flushResults)
+ accumulate = writer.commit(accumulate);
- memtable.cfs.replaceFlushed(memtable, readers);
+ maybeFail(txn.commit(accumulate));
+
+ for (SSTableMultiWriter writer : flushResults)
+ {
+ Collection<SSTableReader> flushedSSTables = writer.finished();
+ for (SSTableReader sstable : flushedSSTables)
+ {
+ if (sstable != null)
+ {
+ sstables.add(sstable);
+ long size = sstable.bytesOnDisk();
+ totalBytesOnDisk += size;
+ maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
+ minBytesOnDisk = Math.min(minBytesOnDisk, size);
+ }
+ }
+ }
+ }
+ memtable.cfs.replaceFlushed(memtable, sstables);
reclaim(memtable);
- return readers;
+ memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
+ logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}",
+ sstables,
+ sstables.size(),
+ FBUtilities.prettyPrintMemory(totalBytesOnDisk),
+ FBUtilities.prettyPrintMemory(maxBytesOnDisk),
+ FBUtilities.prettyPrintMemory(minBytesOnDisk));
+ return sstables;
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f268eda/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 8e1385e,b8e4185..4a43210
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@@ -624,6 -624,6 +624,43 @@@ public class CustomIndexTest extends CQ
assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
}
++ @Test
++ public void testFailing2iFlush() throws Throwable
++ {
++ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
++ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
++
++ for (int i = 0; i < 10; i++)
++ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
++
++ try
++ {
++ getCurrentColumnFamilyStore().forceBlockingFlush();
++ fail("Exception should have been propagated");
++ }
++ catch (Throwable t)
++ {
++ assertTrue(t.getMessage().contains("Broken2I"));
++ }
++
++ // SSTables remain uncommitted.
++ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
++ }
++
++ // Used for index creation above
++ public static class BrokenCustom2I extends StubIndex
++ {
++ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
++ {
++ super(baseCfs, metadata);
++ }
++
++ public Callable<?> getBlockingFlushTask()
++ {
++ throw new RuntimeException("Broken2I");
++ }
++ }
++
private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
{
createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
[09/10] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5439d94c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5439d94c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5439d94c
Branch: refs/heads/trunk
Commit: 5439d94c546331b30acf0d43a503e9426364e81a
Parents: 838a21d 2f268ed
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 12:13:23 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 12:14:24 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 41 ++++++++------------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++++
3 files changed, 55 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5439d94c/CHANGES.txt
----------------------------------------------------------------------