You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/12/06 12:31:26 UTC
[01/12] cassandra git commit: Revert "Make sure sstables only get
committed when it's safe to discard commit log records"
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.11 2f268eda3 -> bed3def9a
refs/heads/cassandra-3.X 5439d94c5 -> f1423806e
refs/heads/trunk 9a7baa145 -> 48591489d
Revert "Make sure sstables only get committed when it's safe to discard commit log records"
This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f
Branch: refs/heads/cassandra-3.11
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/ColumnFamilyStore.java | 77 ++++-------------
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++++--------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 --------------------
4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- @VisibleForTesting
- public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
- final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
+ final CountDownLatch latch = new CountDownLatch(1);
+ volatile FSWriteError flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
- try
- {
- if (flushSecondaryIndexes)
- {
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- }
- }
- catch (Throwable e)
- {
- flushFailure = merge(flushFailure, e);
- }
- finally
- {
- secondaryIndexFlushLatch.countDown();
- }
+
+ if (flushSecondaryIndexes)
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- memtablesFlushLatch.await();
+ latch.await();
}
catch (InterruptedException e)
{
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- Throwables.propagate(flushFailure);
+ throw flushFailure;
return commitLogUpperBound;
}
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- flushMemtable(memtable);
+ {
+ Collection<SSTableReader> readers = Collections.emptyList();
+ if (!memtable.isClean() && !truncate)
+ readers = memtable.flush();
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ }
}
- catch (Throwable e)
+ catch (FSWriteError e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.memtablesFlushLatch.countDown();
- }
-
- public Collection<SSTableReader> flushMemtable(Memtable memtable)
- {
- if (memtable.isClean() || truncate)
- {
- memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
- reclaim(memtable);
- return Collections.emptyList();
- }
-
- Collection<SSTableReader> readers = Collections.emptyList();
- try (SSTableTxnWriter writer = memtable.flush())
- {
- try
- {
- postFlush.secondaryIndexFlushLatch.await();
- }
- catch (InterruptedException e)
- {
- postFlush.flushFailure = merge(postFlush.flushFailure, e);
- }
-
- if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
- // sstables should contain non-repaired data.
- readers = writer.finish(true);
- else
- maybeFail(writer.abort(postFlush.flushFailure));
- }
-
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- return readers;
+ postFlush.latch.countDown();
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public SSTableTxnWriter flush()
+ public Collection<SSTableReader> flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+ private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ Collection<SSTableReader> ssTables;
+ try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- writer.append(iter);
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ {
+ writer.append(iter);
+ }
}
}
- }
- if (writer.getFilePointer() > 0)
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
- else
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
+ if (writer.getFilePointer() > 0)
+ {
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ // sstables should contain non-repaired data.
+ ssTables = writer.finish(true);
+ }
+ else
+ {
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
+ writer.abort();
+ ssTables = Collections.emptyList();
+ }
- return writer;
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+
+ return ssTables;
+ }
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
- @Test
- public void testFailing2iFlush() throws Throwable
- {
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
- createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
- for (int i = 0; i < 10; i++)
- execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
- try
- {
- getCurrentColumnFamilyStore().forceBlockingFlush();
- }
- catch (Throwable t)
- {
- // ignore
- }
-
- // Make sure there's no flush running
- waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
- TimeUnit.SECONDS.toMillis(5));
-
- // SSTables remain uncommitted.
- assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
- }
-
- public void waitFor(Supplier<Boolean> condition, long timeout)
- {
- long start = System.currentTimeMillis();
- while(true)
- {
- if (condition.get())
- return;
-
- assertTrue("Timeout ocurred while waiting for condition",
- System.currentTimeMillis() - start < timeout);
- }
- }
-
- // Used for index creation above
- public static class BrokenCustom2I extends StubIndex
- {
- public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
- {
- super(baseCfs, metadata);
- }
-
- public Callable<?> getBlockingFlushTask()
- {
- throw new RuntimeException();
- }
- }
-}
[04/12] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315
Branch: refs/heads/cassandra-3.11
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 45 +++++++++-----------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
private final class PostFlush implements Callable<ReplayPosition>
{
- final boolean flushSecondaryIndexes;
- final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
- List<Memtable> memtables)
+ private PostFlush(List<Memtable> memtables)
{
- this.writeBarrier = writeBarrier;
- this.flushSecondaryIndexes = flushSecondaryIndexes;
this.memtables = memtables;
}
public ReplayPosition call()
{
- writeBarrier.await();
-
- /**
- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
- * flushed memtables and CL position, which is as good as we can guarantee.
- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
- * with CL as we do with memtables/CFS-backed SecondaryIndexes.
- */
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
-
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
// replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+ postFlush = new PostFlush(memtables);
}
public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
+ boolean flushNonCf2i = true;
for (Memtable memtable : memtables)
{
Collection<SSTableReader> readers = Collections.emptyList();
if (!memtable.isClean() && !truncate)
+ {
+ // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+ // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+ if (flushNonCf2i)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ flushNonCf2i = false;
+ }
readers = memtable.flush();
+ }
memtable.cfs.replaceFlushed(memtable, readers);
reclaim(memtable);
}
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
postFlush.flushFailure = e;
}
-
- // signal the post-flush we've done our work
- postFlush.latch.countDown();
+ finally
+ {
+ // signal the post-flush we've done our work
+ postFlush.latch.countDown();
+ }
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
}
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ fail("Flush should have thrown an exception.");
+ }
+ catch (Throwable t)
+ {
+ assertTrue(t.getMessage().contains("Broken2I"));
+ }
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException("Broken2I");
+ }
+ }
+
private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
{
createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
[10/12] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1423806
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1423806
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1423806
Branch: refs/heads/trunk
Commit: f1423806e7263cbb7cb357f728b5b5181362d892
Parents: 5439d94 bed3def
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:28:27 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:28:27 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[12/12] cassandra git commit: Merge branch 'cassandra-3.X' into trunk
Posted by bl...@apache.org.
Merge branch 'cassandra-3.X' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/48591489
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/48591489
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/48591489
Branch: refs/heads/trunk
Commit: 48591489dd214d5b4df8d1c9e8c5ce1ff1abff93
Parents: 9a7baa1 f142380
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:29:32 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:29:32 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[07/12] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bed3def9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bed3def9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bed3def9
Branch: refs/heads/cassandra-3.X
Commit: bed3def9a0188daad4b3306d5aea28b416be85c2
Parents: 2f268ed 0ecef31
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:27:46 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:27:46 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[05/12] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315
Branch: refs/heads/cassandra-3.X
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 45 +++++++++-----------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
private final class PostFlush implements Callable<ReplayPosition>
{
- final boolean flushSecondaryIndexes;
- final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
- List<Memtable> memtables)
+ private PostFlush(List<Memtable> memtables)
{
- this.writeBarrier = writeBarrier;
- this.flushSecondaryIndexes = flushSecondaryIndexes;
this.memtables = memtables;
}
public ReplayPosition call()
{
- writeBarrier.await();
-
- /**
- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
- * flushed memtables and CL position, which is as good as we can guarantee.
- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
- * with CL as we do with memtables/CFS-backed SecondaryIndexes.
- */
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
-
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
// replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+ postFlush = new PostFlush(memtables);
}
public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
+ boolean flushNonCf2i = true;
for (Memtable memtable : memtables)
{
Collection<SSTableReader> readers = Collections.emptyList();
if (!memtable.isClean() && !truncate)
+ {
+ // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+ // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+ if (flushNonCf2i)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ flushNonCf2i = false;
+ }
readers = memtable.flush();
+ }
memtable.cfs.replaceFlushed(memtable, readers);
reclaim(memtable);
}
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
postFlush.flushFailure = e;
}
-
- // signal the post-flush we've done our work
- postFlush.latch.countDown();
+ finally
+ {
+ // signal the post-flush we've done our work
+ postFlush.latch.countDown();
+ }
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
}
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ fail("Flush should have thrown an exception.");
+ }
+ catch (Throwable t)
+ {
+ assertTrue(t.getMessage().contains("Broken2I"));
+ }
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException("Broken2I");
+ }
+ }
+
private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
{
createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",
[11/12] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by bl...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1423806
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1423806
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1423806
Branch: refs/heads/cassandra-3.X
Commit: f1423806e7263cbb7cb357f728b5b5181362d892
Parents: 5439d94 bed3def
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:28:27 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:28:27 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[03/12] cassandra git commit: Revert "Make sure sstables only get
committed when it's safe to discard commit log records"
Posted by bl...@apache.org.
Revert "Make sure sstables only get committed when it's safe to discard commit log records"
This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f
Branch: refs/heads/trunk
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/ColumnFamilyStore.java | 77 ++++-------------
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++++--------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 --------------------
4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- @VisibleForTesting
- public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
- final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
+ final CountDownLatch latch = new CountDownLatch(1);
+ volatile FSWriteError flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
- try
- {
- if (flushSecondaryIndexes)
- {
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- }
- }
- catch (Throwable e)
- {
- flushFailure = merge(flushFailure, e);
- }
- finally
- {
- secondaryIndexFlushLatch.countDown();
- }
+
+ if (flushSecondaryIndexes)
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- memtablesFlushLatch.await();
+ latch.await();
}
catch (InterruptedException e)
{
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- Throwables.propagate(flushFailure);
+ throw flushFailure;
return commitLogUpperBound;
}
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- flushMemtable(memtable);
+ {
+ Collection<SSTableReader> readers = Collections.emptyList();
+ if (!memtable.isClean() && !truncate)
+ readers = memtable.flush();
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ }
}
- catch (Throwable e)
+ catch (FSWriteError e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.memtablesFlushLatch.countDown();
- }
-
- public Collection<SSTableReader> flushMemtable(Memtable memtable)
- {
- if (memtable.isClean() || truncate)
- {
- memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
- reclaim(memtable);
- return Collections.emptyList();
- }
-
- Collection<SSTableReader> readers = Collections.emptyList();
- try (SSTableTxnWriter writer = memtable.flush())
- {
- try
- {
- postFlush.secondaryIndexFlushLatch.await();
- }
- catch (InterruptedException e)
- {
- postFlush.flushFailure = merge(postFlush.flushFailure, e);
- }
-
- if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
- // sstables should contain non-repaired data.
- readers = writer.finish(true);
- else
- maybeFail(writer.abort(postFlush.flushFailure));
- }
-
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- return readers;
+ postFlush.latch.countDown();
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public SSTableTxnWriter flush()
+ public Collection<SSTableReader> flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+ private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ Collection<SSTableReader> ssTables;
+ try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- writer.append(iter);
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ {
+ writer.append(iter);
+ }
}
}
- }
- if (writer.getFilePointer() > 0)
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
- else
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
+ if (writer.getFilePointer() > 0)
+ {
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ // sstables should contain non-repaired data.
+ ssTables = writer.finish(true);
+ }
+ else
+ {
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
+ writer.abort();
+ ssTables = Collections.emptyList();
+ }
- return writer;
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+
+ return ssTables;
+ }
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
- @Test
- public void testFailing2iFlush() throws Throwable
- {
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
- createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
- for (int i = 0; i < 10; i++)
- execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
- try
- {
- getCurrentColumnFamilyStore().forceBlockingFlush();
- }
- catch (Throwable t)
- {
- // ignore
- }
-
- // Make sure there's no flush running
- waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
- TimeUnit.SECONDS.toMillis(5));
-
- // SSTables remain uncommitted.
- assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
- }
-
- public void waitFor(Supplier<Boolean> condition, long timeout)
- {
- long start = System.currentTimeMillis();
- while(true)
- {
- if (condition.get())
- return;
-
- assertTrue("Timeout ocurred while waiting for condition",
- System.currentTimeMillis() - start < timeout);
- }
- }
-
- // Used for index creation above
- public static class BrokenCustom2I extends StubIndex
- {
- public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
- {
- super(baseCfs, metadata);
- }
-
- public Callable<?> getBlockingFlushTask()
- {
- throw new RuntimeException();
- }
- }
-}
[08/12] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bed3def9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bed3def9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bed3def9
Branch: refs/heads/trunk
Commit: bed3def9a0188daad4b3306d5aea28b416be85c2
Parents: 2f268ed 0ecef31
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:27:46 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:27:46 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[09/12] cassandra git commit: Merge branch 'cassandra-3.0' into
cassandra-3.11
Posted by bl...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bed3def9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bed3def9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bed3def9
Branch: refs/heads/cassandra-3.11
Commit: bed3def9a0188daad4b3306d5aea28b416be85c2
Parents: 2f268ed 0ecef31
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:27:46 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:27:46 2016 +0200
----------------------------------------------------------------------
----------------------------------------------------------------------
[02/12] cassandra git commit: Revert "Make sure sstables only get
committed when it's safe to discard commit log records"
Posted by bl...@apache.org.
Revert "Make sure sstables only get committed when it's safe to discard commit log records"
This reverts commit 6f90e55e7e23cbe814a3232c8d1ec67f2ff2a537 as it was using a wrong version of the patch.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d2ba715f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d2ba715f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d2ba715f
Branch: refs/heads/cassandra-3.X
Commit: d2ba715f2456e1aa821c01941f90b6a58f54e6c4
Parents: 6f90e55
Author: Branimir Lambov <br...@datastax.com>
Authored: Tue Dec 6 14:06:48 2016 +0200
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:06:48 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 -
.../apache/cassandra/db/ColumnFamilyStore.java | 77 ++++-------------
src/java/org/apache/cassandra/db/Memtable.java | 81 ++++++++++--------
.../miscellaneous/ColumnFamilyStoreTest.java | 90 --------------------
4 files changed, 63 insertions(+), 186 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5242adf..5cacdd0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,4 @@
3.0.11
- * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 113e10d..d2a51a9 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
-import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -82,7 +81,6 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static org.apache.cassandra.utils.Throwables.maybeFail;
-import static org.apache.cassandra.utils.Throwables.merge;
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
@@ -126,8 +124,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- @VisibleForTesting
- public static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
@@ -924,9 +921,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
final boolean flushSecondaryIndexes;
final OpOrder.Barrier writeBarrier;
- final CountDownLatch memtablesFlushLatch = new CountDownLatch(1);
- final CountDownLatch secondaryIndexFlushLatch = new CountDownLatch(1);
- volatile Throwable flushFailure = null;
+ final CountDownLatch latch = new CountDownLatch(1);
+ volatile FSWriteError flushFailure = null;
final List<Memtable> memtables;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
@@ -947,27 +943,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
* with CL as we do with memtables/CFS-backed SecondaryIndexes.
*/
- try
- {
- if (flushSecondaryIndexes)
- {
- indexManager.flushAllNonCFSBackedIndexesBlocking();
- }
- }
- catch (Throwable e)
- {
- flushFailure = merge(flushFailure, e);
- }
- finally
- {
- secondaryIndexFlushLatch.countDown();
- }
+
+ if (flushSecondaryIndexes)
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
// on this task can rely on all prior flushes being complete
- memtablesFlushLatch.await();
+ latch.await();
}
catch (InterruptedException e)
{
@@ -986,7 +970,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- Throwables.propagate(flushFailure);
+ throw flushFailure;
return commitLogUpperBound;
}
@@ -1064,9 +1048,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
for (Memtable memtable : memtables)
- flushMemtable(memtable);
+ {
+ Collection<SSTableReader> readers = Collections.emptyList();
+ if (!memtable.isClean() && !truncate)
+ readers = memtable.flush();
+ memtable.cfs.replaceFlushed(memtable, readers);
+ reclaim(memtable);
+ }
}
- catch (Throwable e)
+ catch (FSWriteError e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
@@ -1074,40 +1064,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// signal the post-flush we've done our work
- postFlush.memtablesFlushLatch.countDown();
- }
-
- public Collection<SSTableReader> flushMemtable(Memtable memtable)
- {
- if (memtable.isClean() || truncate)
- {
- memtable.cfs.replaceFlushed(memtable, Collections.emptyList());
- reclaim(memtable);
- return Collections.emptyList();
- }
-
- Collection<SSTableReader> readers = Collections.emptyList();
- try (SSTableTxnWriter writer = memtable.flush())
- {
- try
- {
- postFlush.secondaryIndexFlushLatch.await();
- }
- catch (InterruptedException e)
- {
- postFlush.flushFailure = merge(postFlush.flushFailure, e);
- }
-
- if (postFlush.flushFailure == null && writer.getFilePointer() > 0)
- // sstables should contain non-repaired data.
- readers = writer.finish(true);
- else
- maybeFail(writer.abort(postFlush.flushFailure));
- }
-
- memtable.cfs.replaceFlushed(memtable, readers);
- reclaim(memtable);
- return readers;
+ postFlush.latch.countDown();
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 6404b37..1a7d6cb 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableTxnWriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.DiskAwareRunnable;
import org.apache.cassandra.service.ActiveRepairService;
@@ -318,7 +317,7 @@ public class Memtable implements Comparable<Memtable>
return partitions.get(key);
}
- public SSTableTxnWriter flush()
+ public Collection<SSTableReader> flush()
{
long estimatedSize = estimatedSize();
Directories.DataDirectory dataDirectory = cfs.getDirectories().getWriteableLocation(estimatedSize);
@@ -358,52 +357,64 @@ public class Memtable implements Comparable<Memtable>
* 1.2); // bloom filter and row index overhead
}
- private SSTableTxnWriter writeSortedContents(File sstableDirectory)
+ private Collection<SSTableReader> writeSortedContents(File sstableDirectory)
{
boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME);
logger.debug("Writing {}", Memtable.this.toString());
- SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get());
- boolean trackContention = logger.isTraceEnabled();
- int heavilyContendedRowCount = 0;
- // (we can't clear out the map as-we-go to free up memory,
- // since the memtable is being used for queries in the "pending flush" category)
- for (AtomicBTreePartition partition : partitions.values())
+ Collection<SSTableReader> ssTables;
+ try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
{
- // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
- // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
- // we don't need to preserve tombstones for repair. So if both operation are in this
- // memtable (which will almost always be the case if there is no ongoing failure), we can
- // just skip the entry (CASSANDRA-4667).
- if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
- continue;
-
- if (trackContention && partition.usePessimisticLocking())
- heavilyContendedRowCount++;
-
- if (!partition.isEmpty())
+ boolean trackContention = logger.isTraceEnabled();
+ int heavilyContendedRowCount = 0;
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
+ for (AtomicBTreePartition partition : partitions.values())
{
- try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ // Each batchlog partition is a separate entry in the log. And for an entry, we only do 2
+ // operations: 1) we insert the entry and 2) we delete it. Further, BL data is strictly local,
+ // we don't need to preserve tombstones for repair. So if both operation are in this
+ // memtable (which will almost always be the case if there is no ongoing failure), we can
+ // just skip the entry (CASSANDRA-4667).
+ if (isBatchLogTable && !partition.partitionLevelDeletion().isLive() && partition.hasRows())
+ continue;
+
+ if (trackContention && partition.usePessimisticLocking())
+ heavilyContendedRowCount++;
+
+ if (!partition.isEmpty())
{
- writer.append(iter);
+ try (UnfilteredRowIterator iter = partition.unfilteredIterator())
+ {
+ writer.append(iter);
+ }
}
}
- }
- if (writer.getFilePointer() > 0)
- logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
- writer.getFilename(),
- FBUtilities.prettyPrintMemory(writer.getFilePointer()),
- commitLogUpperBound));
- else
- logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
- writer.getFilename(), commitLogUpperBound);
+ if (writer.getFilePointer() > 0)
+ {
+ logger.debug(String.format("Completed flushing %s (%s) for commitlog position %s",
+ writer.getFilename(),
+ FBUtilities.prettyPrintMemory(writer.getFilePointer()),
+ commitLogUpperBound));
- if (heavilyContendedRowCount > 0)
- logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+ // sstables should contain non-repaired data.
+ ssTables = writer.finish(true);
+ }
+ else
+ {
+ logger.debug("Completed flushing {}; nothing needed to be retained. Commitlog position was {}",
+ writer.getFilename(), commitLogUpperBound);
+ writer.abort();
+ ssTables = Collections.emptyList();
+ }
- return writer;
+ if (heavilyContendedRowCount > 0)
+ logger.trace(String.format("High update contention in %d/%d partitions of %s ", heavilyContendedRowCount, partitions.size(), Memtable.this.toString()));
+
+ return ssTables;
+ }
}
@SuppressWarnings("resource") // log and writer closed by SSTableTxnWriter
http://git-wip-us.apache.org/repos/asf/cassandra/blob/d2ba715f/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
deleted file mode 100644
index 1285392..0000000
--- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/ColumnFamilyStoreTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.cql3.validation.miscellaneous;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.junit.Test;
-
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.index.StubIndex;
-import org.apache.cassandra.schema.IndexMetadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class ColumnFamilyStoreTest extends CQLTester
-{
- @Test
- public void testFailing2iFlush() throws Throwable
- {
- createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
- createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.cql3.validation.miscellaneous.ColumnFamilyStoreTest$BrokenCustom2I'");
-
- for (int i = 0; i < 10; i++)
- execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
-
- try
- {
- getCurrentColumnFamilyStore().forceBlockingFlush();
- }
- catch (Throwable t)
- {
- // ignore
- }
-
- // Make sure there's no flush running
- waitFor(() -> ((JMXEnabledThreadPoolExecutor) ColumnFamilyStore.flushExecutor).getActiveCount() == 0,
- TimeUnit.SECONDS.toMillis(5));
-
- // SSTables remain uncommitted.
- assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
- }
-
- public void waitFor(Supplier<Boolean> condition, long timeout)
- {
- long start = System.currentTimeMillis();
- while(true)
- {
- if (condition.get())
- return;
-
- assertTrue("Timeout ocurred while waiting for condition",
- System.currentTimeMillis() - start < timeout);
- }
- }
-
- // Used for index creation above
- public static class BrokenCustom2I extends StubIndex
- {
- public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
- {
- super(baseCfs, metadata);
- }
-
- public Callable<?> getBlockingFlushTask()
- {
- throw new RuntimeException();
- }
- }
-}
[06/12] cassandra git commit: Make sure sstables only get committed
when it's safe to discard commit log records
Posted by bl...@apache.org.
Make sure sstables only get committed when it's safe to discard commit log records
Patch by Alex Petrov; reviewed by Branimir Lambov for CASSANDRA-12956
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ecef315
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ecef315
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ecef315
Branch: refs/heads/trunk
Commit: 0ecef31548c287ac2d9f818413457bc947362733
Parents: d2ba715
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue Nov 29 22:58:36 2016 +0100
Committer: Branimir Lambov <br...@datastax.com>
Committed: Tue Dec 6 14:10:00 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 45 +++++++++-----------
.../apache/cassandra/index/CustomIndexTest.java | 37 ++++++++++++++++
3 files changed, 58 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5cacdd0..5242adf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
* Reject default_time_to_live option when creating or altering MVs (CASSANDRA-12868)
* Nodetool should use a more sane max heap size (CASSANDRA-12739)
* LocalToken ensures token values are cloned on heap (CASSANDRA-12651)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d2a51a9..71e1653 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -919,34 +919,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
private final class PostFlush implements Callable<ReplayPosition>
{
- final boolean flushSecondaryIndexes;
- final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
- volatile FSWriteError flushFailure = null;
+ volatile Throwable flushFailure = null;
final List<Memtable> memtables;
- private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier,
- List<Memtable> memtables)
+ private PostFlush(List<Memtable> memtables)
{
- this.writeBarrier = writeBarrier;
- this.flushSecondaryIndexes = flushSecondaryIndexes;
this.memtables = memtables;
}
public ReplayPosition call()
{
- writeBarrier.await();
-
- /**
- * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
- * flushed memtables and CL position, which is as good as we can guarantee.
- * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
- * with CL as we do with memtables/CFS-backed SecondaryIndexes.
- */
-
- if (flushSecondaryIndexes)
- indexManager.flushAllNonCFSBackedIndexesBlocking();
-
try
{
// we wait on the latch for the commitLogUpperBound to be set, and so that waiters
@@ -970,7 +953,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
metric.pendingFlushes.dec();
if (flushFailure != null)
- throw flushFailure;
+ Throwables.propagate(flushFailure);
return commitLogUpperBound;
}
@@ -1029,7 +1012,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier
// replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
- postFlush = new PostFlush(!truncate, writeBarrier, memtables);
+ postFlush = new PostFlush(memtables);
}
public void run()
@@ -1047,24 +1030,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
try
{
+ boolean flushNonCf2i = true;
for (Memtable memtable : memtables)
{
Collection<SSTableReader> readers = Collections.emptyList();
if (!memtable.isClean() && !truncate)
+ {
+ // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+ // with CL as we do with memtables/CFS-backed SecondaryIndexes.
+ if (flushNonCf2i)
+ {
+ indexManager.flushAllNonCFSBackedIndexesBlocking();
+ flushNonCf2i = false;
+ }
readers = memtable.flush();
+ }
memtable.cfs.replaceFlushed(memtable, readers);
reclaim(memtable);
}
}
- catch (FSWriteError e)
+ catch (Throwable e)
{
JVMStabilityInspector.inspectThrowable(e);
// If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
postFlush.flushFailure = e;
}
-
- // signal the post-flush we've done our work
- postFlush.latch.countDown();
+ finally
+ {
+ // signal the post-flush we've done our work
+ postFlush.latch.countDown();
+ }
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ecef315/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index b8e4185..6930d13 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -624,6 +624,43 @@ public class CustomIndexTest extends CQLTester
assertEquals("bar", IndexWithOverloadedValidateOptions.options.get("foo"));
}
+ @Test
+ public void testFailing2iFlush() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, value int)");
+ createIndex("CREATE CUSTOM INDEX IF NOT EXISTS ON %s(value) USING 'org.apache.cassandra.index.CustomIndexTest$BrokenCustom2I'");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (pk, value) VALUES (?, ?)", i, i);
+
+ try
+ {
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ fail("Flush should have thrown an exception.");
+ }
+ catch (Throwable t)
+ {
+ assertTrue(t.getMessage().contains("Broken2I"));
+ }
+
+ // SSTables remain uncommitted.
+ assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
+ }
+
+ // Used for index creation above
+ public static class BrokenCustom2I extends StubIndex
+ {
+ public BrokenCustom2I(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ }
+
+ public Callable<?> getBlockingFlushTask()
+ {
+ throw new RuntimeException("Broken2I");
+ }
+ }
+
private void testCreateIndex(String indexName, String... targetColumnNames) throws Throwable
{
createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(%s) USING '%s'",