You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/08/05 13:48:53 UTC
[02/23] cassandra git commit: Disable passing control to post-flush
after flush failure to prevent data loss.
Disable passing control to post-flush after flush failure to prevent
data loss.
patch by Branimir Lambov; reviewed by Sylvain Lebresne for
CASSANDRA-11828
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bd665473
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bd665473
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bd665473
Branch: refs/heads/cassandra-2.2
Commit: bd6654733dded3513c2c7acf96df2c364b0c043e
Parents: bc0d1da
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Aug 3 11:32:48 2016 +0300
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Aug 5 15:35:25 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 45 ++++--
.../apache/cassandra/cql3/OutOfSpaceBase.java | 87 ++++++++++
.../cassandra/cql3/OutOfSpaceDieTest.java | 68 ++++++++
.../cassandra/cql3/OutOfSpaceIgnoreTest.java | 60 +++++++
.../cassandra/cql3/OutOfSpaceStopTest.java | 63 ++++++++
.../apache/cassandra/cql3/OutOfSpaceTest.java | 157 -------------------
7 files changed, 311 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ecc787..1275631 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.16
+ * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828)
* Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
* cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
* Fix filtering on clustering columns when 2i is used (CASSANDRA-11907)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b64d5de..6e82745 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -99,6 +99,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
new NamedThreadFactory("MemtablePostFlush"),
"internal");
+ // If a flush fails with an error the post-flush is never allowed to continue. This stores the error that caused it
+ // to be able to show an error on following flushes instead of blindly continuing.
+ private static volatile FSWriteError previousFlushFailure = null;
+
private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
StageManager.KEEPALIVE,
TimeUnit.SECONDS,
@@ -869,12 +873,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
synchronized (data)
{
+ if (previousFlushFailure != null)
+ throw new IllegalStateException("A flush previously failed with the error below. To prevent data loss, "
+ + "no flushes can be carried out until the node is restarted.",
+ previousFlushFailure);
logFlush();
Flush flush = new Flush(false);
- flushExecutor.execute(flush);
+ ListenableFutureTask<?> flushTask = ListenableFutureTask.create(flush, null);
+ flushExecutor.submit(flushTask);
ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
postFlushExecutor.submit(task);
- return task;
+
+ @SuppressWarnings("unchecked")
+ ListenableFuture<?> future = Futures.allAsList(flushTask, task);
+ return future;
}
}
@@ -967,7 +979,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
final OpOrder.Barrier writeBarrier;
final CountDownLatch latch = new CountDownLatch(1);
final ReplayPosition lastReplayPosition;
- volatile FSWriteError flushFailure = null;
private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition)
{
@@ -1010,16 +1021,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// must check lastReplayPosition != null because Flush may find that all memtables are clean
// and so not set a lastReplayPosition
- // If a flush errored out but the error was ignored, make sure we don't discard the commit log.
- if (lastReplayPosition != null && flushFailure == null)
+ if (lastReplayPosition != null)
{
CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
}
metric.pendingFlushes.dec();
-
- if (flushFailure != null)
- throw flushFailure;
}
}
@@ -1127,16 +1134,28 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
reclaim(memtable);
}
+
+ // signal the post-flush we've done our work
+ // Note: This should not be done in case of error. Read more below.
+ postFlush.latch.countDown();
}
catch (FSWriteError e)
{
JVMStabilityInspector.inspectThrowable(e);
- // If we weren't killed, try to continue work but do not allow CommitLog to be discarded.
- postFlush.flushFailure = e;
+ // The call above may kill the process or the transports, or ignore the error.
+ // In any case we should not be passing on control to post-flush as a subsequent succeeding flush
+ // could mask the error and:
+ // - let the commit log discard unpersisted data, resulting in data loss
+ // - let truncations proceed, with the possibility of resurrecting the unflushed data
+ // - let snapshots succeed with incomplete data
+
+ // Not passing control on means that all flushes from the moment of failure cannot complete
+ // (including snapshots).
+ // If the disk failure policy is ignore, this will cause memtables and the commit log to grow
+ // unboundedly until the node eventually fails.
+ previousFlushFailure = e;
+ throw e;
}
-
- // signal the post-flush we've done our work
- postFlush.latch.countDown();
}
private void reclaim(final Memtable memtable)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
new file mode 100644
index 0000000..c0023dc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceBase.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import java.io.IOError;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.db.BlacklistedDirectories;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories.DataDirectory;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.FSWriteError;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure policy.
+ * We cannot recover after a failed flush due to postFlushExecutor being stuck, so each test needs to run separately.
+ */
+public class OutOfSpaceBase extends CQLTester
+{
+ public void makeTable() throws Throwable
+ {
+ createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, b));");
+
+ for (int i = 0; i < 10; i++)
+ execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + "', null);");
+ }
+
+ public void markDirectoriesUnwriteable()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+ try
+ {
+ for ( ; ; )
+ {
+ DataDirectory dir = cfs.directories.getWriteableLocation(1);
+ BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir));
+ }
+ }
+ catch (IOError e)
+ {
+ // Expected -- marked all directories as unwritable
+ }
+ }
+
+ public void flushAndExpectError() throws InterruptedException, ExecutionException
+ {
+ try
+ {
+ Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get();
+ fail("FSWriteError expected.");
+ }
+ catch (ExecutionException e)
+ {
+ // Correct path.
+ Assert.assertTrue(e.getCause() instanceof FSWriteError);
+ }
+
+ // Make sure commit log wasn't discarded.
+ UUID cfid = currentTableMetadata().cfId;
+ for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments())
+ if (segment.getDirtyCFIDs().contains(cfid))
+ return;
+ fail("Expected commit log to remain dirty for the affected table.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java
new file mode 100644
index 0000000..46d71e4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceDieTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure policy.
+ */
+public class OutOfSpaceDieTest extends OutOfSpaceBase
+{
+ @Test
+ public void testFlushUnwriteableDie() throws Throwable
+ {
+ makeTable();
+ markDirectoriesUnwriteable();
+
+ KillerForTests killerForTests = new KillerForTests();
+ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+ DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die);
+ flushAndExpectError();
+ Assert.assertTrue(killerForTests.wasKilled());
+ Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
+ }
+ finally
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+ JVMStabilityInspector.replaceKiller(originalKiller);
+ }
+
+ makeTable();
+ try
+ {
+ flush();
+ fail("Subsequent flushes expected to fail.");
+ }
+ catch (RuntimeException e)
+ {
+ // correct path
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java
new file mode 100644
index 0000000..854de80
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceIgnoreTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure policy.
+ */
+public class OutOfSpaceIgnoreTest extends OutOfSpaceBase
+{
+ @Test
+ public void testFlushUnwriteableIgnore() throws Throwable
+ {
+ makeTable();
+ markDirectoriesUnwriteable();
+
+ DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
+ flushAndExpectError();
+ }
+ finally
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+ }
+
+ makeTable();
+ try
+ {
+ flush();
+ fail("Subsequent flushes expected to fail.");
+ }
+ catch (RuntimeException e)
+ {
+ // correct path
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java
new file mode 100644
index 0000000..b48df56
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/OutOfSpaceStopTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import static junit.framework.Assert.fail;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+
+/**
+ * Test that exceptions during flush are treated according to the disk failure policy.
+ */
+public class OutOfSpaceStopTest extends OutOfSpaceBase
+{
+ @Test
+ public void testFlushUnwriteableStop() throws Throwable
+ {
+ makeTable();
+ markDirectoriesUnwriteable();
+
+ DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
+ try
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop);
+ flushAndExpectError();
+ Assert.assertFalse(Gossiper.instance.isEnabled());
+ }
+ finally
+ {
+ DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
+ }
+
+ makeTable();
+ try
+ {
+ flush();
+ fail("Subsequent flushes expected to fail.");
+ }
+ catch (RuntimeException e)
+ {
+ // correct path
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bd665473/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java b/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
deleted file mode 100644
index 8304aff..0000000
--- a/test/unit/org/apache/cassandra/cql3/OutOfSpaceTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.cql3;
-
-import static junit.framework.Assert.fail;
-
-import java.io.IOError;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.config.Config.DiskFailurePolicy;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.BlacklistedDirectories;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Directories.DataDirectory;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-/**
- * Test that TombstoneOverwhelmingException gets thrown when it should be and doesn't when it shouldn't be.
- */
-public class OutOfSpaceTest extends CQLTester
-{
- @Test
- public void testFlushUnwriteableDie() throws Throwable
- {
- makeTable();
- markDirectoriesUnwriteable();
-
- KillerForTests killerForTests = new KillerForTests();
- JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
- DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
- try
- {
- DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.die);
- flushAndExpectError();
- Assert.assertTrue(killerForTests.wasKilled());
- Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
- }
- finally
- {
- DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
- JVMStabilityInspector.replaceKiller(originalKiller);
- }
- }
-
- @Test
- public void testFlushUnwriteableStop() throws Throwable
- {
- makeTable();
- markDirectoriesUnwriteable();
-
- DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
- try
- {
- DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.stop);
- flushAndExpectError();
- Assert.assertFalse(Gossiper.instance.isEnabled());
- }
- finally
- {
- DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
- }
- }
-
- @Test
- public void testFlushUnwriteableIgnore() throws Throwable
- {
- makeTable();
- markDirectoriesUnwriteable();
-
- DiskFailurePolicy oldPolicy = DatabaseDescriptor.getDiskFailurePolicy();
- try
- {
- DatabaseDescriptor.setDiskFailurePolicy(DiskFailurePolicy.ignore);
- flushAndExpectError();
- }
- finally
- {
- DatabaseDescriptor.setDiskFailurePolicy(oldPolicy);
- }
-
- // Next flush should succeed.
- makeTable();
- flush();
- }
-
- public void makeTable() throws Throwable
- {
- createTable("CREATE TABLE %s (a text, b text, c text, PRIMARY KEY (a, b));");
-
- // insert exactly the amount of tombstones that shouldn't trigger an exception
- for (int i = 0; i < 10; i++)
- execute("INSERT INTO %s (a, b, c) VALUES ('key', 'column" + i + "', null);");
- }
-
- public void markDirectoriesUnwriteable()
- {
- ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
- try
- {
- for ( ; ; )
- {
- DataDirectory dir = cfs.directories.getWriteableLocation(1);
- BlacklistedDirectories.maybeMarkUnwritable(cfs.directories.getLocationForDisk(dir));
- }
- }
- catch (IOError e)
- {
- // Expected -- marked all directories as unwritable
- }
- }
-
- public void flushAndExpectError() throws InterruptedException, ExecutionException
- {
- try
- {
- Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()).forceFlush().get();
- fail("FSWriteError expected.");
- }
- catch (ExecutionException e)
- {
- // Correct path.
- Assert.assertTrue(e.getCause() instanceof FSWriteError);
- }
-
- // Make sure commit log wasn't discarded.
- UUID cfid = currentTableMetadata().cfId;
- for (CommitLogSegment segment : CommitLog.instance.allocator.getActiveSegments())
- if (segment.getDirtyCFIDs().contains(cfid))
- return;
- fail("Expected commit log to remain dirty for the affected table.");
- }
-}