You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/02/11 13:15:09 UTC
[1/2] git commit: Add commit_failure_policy.
Updated Branches:
refs/heads/trunk 7604935ef -> a6e7d1405
Add commit_failure_policy.
Patch by belliottsmith, reviewed by marcuse for CASSANDRA-6364
CASSANDRA-6364
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9381b8d5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9381b8d5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9381b8d5
Branch: refs/heads/trunk
Commit: 9381b8d569ae17cf2760bca266b5253e4bcd6ac2
Parents: 55b5605
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Feb 11 13:13:37 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Feb 11 13:13:37 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +-
conf/cassandra.yaml | 8 +++++
.../org/apache/cassandra/config/Config.java | 8 +++++
.../cassandra/config/DatabaseDescriptor.java | 11 ++++++
.../BatchCommitLogExecutorService.java | 17 +++++++--
.../cassandra/db/commitlog/CommitLog.java | 24 +++++++++++++
.../db/commitlog/CommitLogAllocator.java | 37 +++++++++++++-------
.../PeriodicCommitLogExecutorService.java | 26 ++++++++++++--
.../org/apache/cassandra/io/util/FileUtils.java | 20 ++---------
.../cassandra/service/StorageService.java | 19 ++++++++++
.../org/apache/cassandra/db/CommitLogTest.java | 32 +++++++++++++++++
11 files changed, 169 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 93552ef..a8114a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,13 +9,13 @@
* Account for range/row tombstones in tombstone drop
time histogram (CASSANDRA-6522)
* Stop CommitLogSegment.close() from calling sync() (CASSANDRA-6652)
+ * Make commitlog failure handling configurable (CASSANDRA-6364)
Merged from 1.2:
* Fix upgradesstables NPE for non-CF-based indexes (CASSANDRA-6645)
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
* Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
* Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
-
2.0.5
* Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
* Add ks.cf names to tombstone logging (CASSANDRA-6597)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index db924bb..bfe60c4 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -121,6 +121,14 @@ commitlog_directory: /var/lib/cassandra/commitlog
# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
disk_failure_policy: stop
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+# can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but
+# continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
# Maximum size of the key cache in memory.
#
# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index a4e4e92..2fa49f3 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -45,6 +45,7 @@ public class Config
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+ public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
/* initial token in the ring */
public String initial_token;
@@ -230,6 +231,13 @@ public class Config
ignore,
}
+ public static enum CommitFailurePolicy
+ {
+ stop,
+ stop_commit,
+ ignore,
+ }
+
public static enum RequestSchedulerId
{
keyspace
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bd5db69..e1a95ab 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -194,6 +194,7 @@ public class DatabaseDescriptor
}
logger.info("disk_failure_policy is " + conf.disk_failure_policy);
+ logger.info("commit_failure_policy is " + conf.commit_failure_policy);
/* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */
if (conf.authenticator != null)
@@ -1082,6 +1083,16 @@ public class DatabaseDescriptor
return conf.disk_failure_policy;
}
+ public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+ {
+ conf.commit_failure_policy = policy;
+ }
+
+ public static Config.CommitFailurePolicy getCommitFailurePolicy()
+ {
+ return conf.commit_failure_policy;
+ }
+
public static boolean isSnapshotBeforeCompaction()
{
return conf.snapshot_before_compaction;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
index d985f1f..9c2e2ac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/BatchCommitLogExecutorService.java
@@ -20,12 +20,17 @@ package org.apache.cassandra.db.commitlog;
import java.util.ArrayList;
import java.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.FSError;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
{
+
private final BlockingQueue<CheaterFutureTask> queue;
private final Thread appendingThread;
private volatile boolean run = true;
@@ -44,8 +49,16 @@ class BatchCommitLogExecutorService extends AbstractCommitLogExecutorService
{
while (run)
{
- if (processWithSyncBatch())
- completedTaskCount++;
+ try
+ {
+ if (processWithSyncBatch())
+ completedTaskCount++;
+ }
+ catch (Throwable t)
+ {
+ if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+ return;
+ }
}
}
};
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index e9507da..4bab83f 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,13 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,9 +35,11 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
/*
@@ -363,4 +368,23 @@ public class CommitLog implements CommitLogMBean
return null;
}
}
+
+ static boolean handleCommitError(String message, Throwable t)
+ {
+ switch (DatabaseDescriptor.getCommitFailurePolicy())
+ {
+ case stop:
+ StorageService.instance.stopTransports();
+ case stop_commit:
+ logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+ return false;
+ case ignore:
+ logger.error(message, t);
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ return true;
+ default:
+ throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index 706cf9e..575e3c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -88,22 +89,32 @@ public class CommitLogAllocator
{
while (run)
{
- Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
-
- if (r != null)
- {
- r.run();
- }
- else
+ try
{
- // no job, so we're clear to check to see if we're out of segments
- // and ready a new one if needed. has the effect of ensuring there's
- // almost always a segment available when it's needed.
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+
+ Runnable r = queue.poll(TICK_CYCLE_TIME, TimeUnit.MILLISECONDS);
+
+ if (r != null)
{
- logger.debug("No segments in reserve; creating a fresh one");
- createFreshSegment();
+ r.run();
}
+ else
+ {
+ // no job, so we're clear to check to see if we're out of segments
+ // and ready a new one if needed. has the effect of ensuring there's
+ // almost always a segment available when it's needed.
+ if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ {
+ logger.debug("No segments in reserve; creating a fresh one");
+ createFreshSegment();
+ }
+ }
+
+ }
+ catch (Throwable t)
+ {
+ if (!CommitLog.handleCommitError("Failed to allocate new commit log segments", t))
+ return;
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 30f33b6..00507c2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -25,9 +25,12 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
{
+
private final BlockingQueue<Runnable> queue;
protected volatile long completedTaskCount = 0;
private final Thread appendingThread;
@@ -69,8 +72,27 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
{
while (run)
{
- FBUtilities.waitOnFuture(submit(syncer));
- Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS);
+ try
+ {
+ FBUtilities.waitOnFuture(submit(syncer));
+ Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getCommitLogSyncPeriod(), TimeUnit.MILLISECONDS);
+ }
+ catch (Throwable t)
+ {
+ if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+ {
+ PeriodicCommitLogExecutorService.this.run = false;
+ try
+ {
+ appendingThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException();
+ }
+ return;
+ }
+ }
}
}
}, "PERIODIC-COMMIT-LOG-SYNCER").start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 0d8538e..e091465 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.BlacklistedDirectories;
import org.apache.cassandra.db.Keyspace;
@@ -412,23 +413,7 @@ public class FileUtils
switch (DatabaseDescriptor.getDiskFailurePolicy())
{
case stop:
- if (StorageService.instance.isInitialized())
- {
- logger.error("Stopping gossiper");
- StorageService.instance.stopGossiping();
- }
-
- if (StorageService.instance.isRPCServerRunning())
- {
- logger.error("Stopping RPC server");
- StorageService.instance.stopRPCServer();
- }
-
- if (StorageService.instance.isNativeTransportRunning())
- {
- logger.error("Stopping native transport");
- StorageService.instance.stopNativeTransport();
- }
+ StorageService.instance.stopTransports();
break;
case best_effort:
// for both read and write errors mark the path as unwritable.
@@ -447,4 +432,5 @@ public class FileUtils
throw new IllegalStateException();
}
}
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e181c44..09b93d7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -357,6 +357,25 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return daemon.nativeServer.isRunning();
}
+ public void stopTransports()
+ {
+ if (isInitialized())
+ {
+ logger.error("Stopping gossiper");
+ stopGossiping();
+ }
+ if (isRPCServerRunning())
+ {
+ logger.error("Stopping RPC server");
+ stopRPCServer();
+ }
+ if (isNativeTransportRunning())
+ {
+ logger.error("Stopping native transport");
+ stopNativeTransport();
+ }
+ }
+
private void shutdownClientServers()
{
stopRPCServer();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9381b8d5/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 8e5f418..036ce15 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,17 +22,21 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -225,4 +229,32 @@ public class CommitLogTest extends SchemaLoader
String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
}
+
+ @Test
+ public void testCommitFailurePolicy_stop()
+ {
+ File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+ try
+ {
+
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+ commitDir.setWritable(false);
+ RowMutation rm = new RowMutation("Keyspace1", bytes("k"));
+ rm.add("Standard1", bytes("c1"), ByteBuffer.allocate(100), 0);
+
+ // Adding it twice (won't change segment)
+ CommitLog.instance.add(rm);
+ Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+ Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+ Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+ Assert.assertFalse(StorageService.instance.isInitialized());
+
+ }
+ finally
+ {
+ commitDir.setWritable(true);
+ }
+ }
+
}
[2/2] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a6e7d140
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a6e7d140
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a6e7d140
Branch: refs/heads/trunk
Commit: a6e7d1405ad34127948538a866ef6fd027bf940d
Parents: 7604935 9381b8d
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Feb 11 13:14:28 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Feb 11 13:14:28 2014 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------