You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/06/30 14:03:13 UTC
[1/6] git commit: Merge commit failure policy into 2.1
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 73a07daf9 -> 5c8569976
refs/heads/cassandra-2.1.0 b8ee9da1e -> da3606e18
refs/heads/trunk 64772865c -> 2472701e1
Merge commit failure policy into 2.1
patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1
Branch: refs/heads/cassandra-2.1
Commit: da3606e180411e88774a9f24e7884147a202fea8
Parents: b8ee9da
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:00:42 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:00:42 2014 +0100
----------------------------------------------------------------------
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 8 ++
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../db/commitlog/AbstractCommitLogService.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 21 +++++
.../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++---------
.../org/apache/cassandra/io/util/FileUtils.java | 13 ---
.../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++
8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 64b5987..f1e5576 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
disk_failure_policy: stop
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+# can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but
+# continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
# Maximum size of the key cache in memory.
#
# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index c5e2b76..4d4a95b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -57,6 +57,7 @@ public class Config
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+ public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
/* initial token in the ring */
public String initial_token;
@@ -300,6 +301,13 @@ public class Config
stop_paranoid,
}
+ public static enum CommitFailurePolicy
+ {
+ stop,
+ stop_commit,
+ ignore,
+ }
+
public static enum RequestSchedulerId
{
keyspace
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15c8c09..e8ce9df 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,6 +1192,16 @@ public class DatabaseDescriptor
return conf.disk_failure_policy;
}
+ public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+ {
+ conf.commit_failure_policy = policy;
+ }
+
+ public static Config.CommitFailurePolicy getCommitFailurePolicy()
+ {
+ return conf.commit_failure_policy;
+ }
+
public static boolean isSnapshotBeforeCompaction()
{
return conf.snapshot_before_compaction;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 94802bf..59bf691 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService
}
catch (Throwable t)
{
- logger.error("Commit log sync failed", t);
+ if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+ break;
+
// sleep for full poll-interval after an error, so we don't spam the log file
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index eaa1b3c..41c01c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,12 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DataOutputByteBuffer;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.PureJavaCrc32;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean
{
return allocator.getActiveSegments().size();
}
+
+ static boolean handleCommitError(String message, Throwable t)
+ {
+ switch (DatabaseDescriptor.getCommitFailurePolicy())
+ {
+ case stop:
+ StorageService.instance.stopTransports();
+ case stop_commit:
+ logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+ return false;
+ case ignore:
+ logger.error(message, t);
+ return true;
+ default:
+ throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 117b9d1..ed0a7ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Iterables;
@@ -101,56 +102,65 @@ public class CommitLogSegmentManager
{
while (run)
{
- Callable<CommitLogSegment> task = segmentManagementTasks.poll();
- if (task == null)
+ try
{
- // if we have no more work to do, check if we should create a new segment
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+ if (task == null)
{
- logger.debug("No segments in reserve; creating a fresh one");
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- // TODO : some error handling in case we fail to create a new segment
- availableSegments.add(CommitLogSegment.freshSegment());
- hasAvailableSegments.signalAll();
- }
+ // if we have no more work to do, check if we should create a new segment
+ if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ {
+ logger.debug("No segments in reserve; creating a fresh one");
+ size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+ // TODO : some error handling in case we fail to create a new segment
+ availableSegments.add(CommitLogSegment.freshSegment());
+ hasAvailableSegments.signalAll();
+ }
- // flush old Cfs if we're full
- long unused = unusedCapacity();
- if (unused < 0)
- {
- List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
- long spaceToReclaim = 0;
- for (CommitLogSegment segment : activeSegments)
+ // flush old Cfs if we're full
+ long unused = unusedCapacity();
+ if (unused < 0)
{
- if (segment == allocatingFrom)
- break;
- segmentsToRecycle.add(segment);
- spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
- if (spaceToReclaim + unused >= 0)
- break;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ long spaceToReclaim = 0;
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ segmentsToRecycle.add(segment);
+ spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+ if (spaceToReclaim + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
}
- flushDataFrom(segmentsToRecycle, false);
- }
- try
- {
- // wait for new work to be provided
- task = segmentManagementTasks.take();
+ try
+ {
+ // wait for new work to be provided
+ task = segmentManagementTasks.take();
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown signal; exit cleanly
+ continue;
+ }
}
- catch (InterruptedException e)
+
+ CommitLogSegment recycled = task.call();
+ if (recycled != null)
{
- // shutdown signal; exit cleanly
- continue;
+ // if the work resulted in a segment to recycle, publish it
+ availableSegments.add(recycled);
+ hasAvailableSegments.signalAll();
}
}
-
- // TODO : some error handling in case we fail on executing call (e.g. recycling)
- CommitLogSegment recycled = task.call();
- if (recycled != null)
+ catch (Throwable t)
{
- // if the work resulted in a segment to recycle, publish it
- availableSegments.add(recycled);
- hasAvailableSegments.signalAll();
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 875c9d5..41b7aa3 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -390,19 +390,6 @@ public class FileUtils
}
}
- public static void skipBytesFully(DataInput in, long bytes) throws IOException
- {
- long n = 0;
- while (n < bytes)
- {
- int m = (int) Math.min(Integer.MAX_VALUE, bytes - n);
- int skipped = in.skipBytes(m);
- if (skipped == 0)
- throw new EOFException("EOF after " + n + " bytes out of " + bytes);
- n += skipped;
- }
- }
-
public static void handleCorruptSSTable(CorruptSSTableException e)
{
if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 660e91e..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,9 +22,11 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Test;
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader
String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
}
+
+ @Test
+ public void testCommitFailurePolicy_stop()
+ {
+ File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+ try
+ {
+
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+ commitDir.setWritable(false);
+ Mutation rm = new Mutation("Keyspace1", bytes("k"));
+ rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+
+ // Adding it twice (won't change segment)
+ CommitLog.instance.add(rm);
+ Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+ Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+ Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+ Assert.assertFalse(StorageService.instance.isInitialized());
+
+ }
+ finally
+ {
+ commitDir.setWritable(true);
+ }
+ }
+
}
[4/6] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by be...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c856997
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c856997
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c856997
Branch: refs/heads/trunk
Commit: 5c8569976e4ff8892860191644a82a55c4db2253
Parents: 73a07da da3606e
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:02:34 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:02:34 2014 +0100
----------------------------------------------------------------------
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 8 ++
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../db/commitlog/AbstractCommitLogService.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 21 +++++
.../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++---------
.../org/apache/cassandra/io/util/FileUtils.java | 13 ---
.../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++
8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
[5/6] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1
Posted by be...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c856997
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c856997
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c856997
Branch: refs/heads/cassandra-2.1
Commit: 5c8569976e4ff8892860191644a82a55c4db2253
Parents: 73a07da da3606e
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:02:34 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:02:34 2014 +0100
----------------------------------------------------------------------
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 8 ++
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../db/commitlog/AbstractCommitLogService.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 21 +++++
.../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++---------
.../org/apache/cassandra/io/util/FileUtils.java | 13 ---
.../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++
8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
[3/6] git commit: Merge commit failure policy into 2.1
Posted by be...@apache.org.
Merge commit failure policy into 2.1
patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1
Branch: refs/heads/trunk
Commit: da3606e180411e88774a9f24e7884147a202fea8
Parents: b8ee9da
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:00:42 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:00:42 2014 +0100
----------------------------------------------------------------------
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 8 ++
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../db/commitlog/AbstractCommitLogService.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 21 +++++
.../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++---------
.../org/apache/cassandra/io/util/FileUtils.java | 13 ---
.../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++
8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 64b5987..f1e5576 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
disk_failure_policy: stop
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+# can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but
+# continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
# Maximum size of the key cache in memory.
#
# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index c5e2b76..4d4a95b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -57,6 +57,7 @@ public class Config
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+ public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
/* initial token in the ring */
public String initial_token;
@@ -300,6 +301,13 @@ public class Config
stop_paranoid,
}
+ public static enum CommitFailurePolicy
+ {
+ stop,
+ stop_commit,
+ ignore,
+ }
+
public static enum RequestSchedulerId
{
keyspace
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15c8c09..e8ce9df 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,6 +1192,16 @@ public class DatabaseDescriptor
return conf.disk_failure_policy;
}
+ public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+ {
+ conf.commit_failure_policy = policy;
+ }
+
+ public static Config.CommitFailurePolicy getCommitFailurePolicy()
+ {
+ return conf.commit_failure_policy;
+ }
+
public static boolean isSnapshotBeforeCompaction()
{
return conf.snapshot_before_compaction;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 94802bf..59bf691 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService
}
catch (Throwable t)
{
- logger.error("Commit log sync failed", t);
+ if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+ break;
+
// sleep for full poll-interval after an error, so we don't spam the log file
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index eaa1b3c..41c01c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,12 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DataOutputByteBuffer;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.PureJavaCrc32;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean
{
return allocator.getActiveSegments().size();
}
+
+ static boolean handleCommitError(String message, Throwable t)
+ {
+ switch (DatabaseDescriptor.getCommitFailurePolicy())
+ {
+ case stop:
+ StorageService.instance.stopTransports();
+ case stop_commit:
+ logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+ return false;
+ case ignore:
+ logger.error(message, t);
+ return true;
+ default:
+ throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 117b9d1..ed0a7ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Iterables;
@@ -101,56 +102,65 @@ public class CommitLogSegmentManager
{
while (run)
{
- Callable<CommitLogSegment> task = segmentManagementTasks.poll();
- if (task == null)
+ try
{
- // if we have no more work to do, check if we should create a new segment
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+ if (task == null)
{
- logger.debug("No segments in reserve; creating a fresh one");
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- // TODO : some error handling in case we fail to create a new segment
- availableSegments.add(CommitLogSegment.freshSegment());
- hasAvailableSegments.signalAll();
- }
+ // if we have no more work to do, check if we should create a new segment
+ if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ {
+ logger.debug("No segments in reserve; creating a fresh one");
+ size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+ // TODO : some error handling in case we fail to create a new segment
+ availableSegments.add(CommitLogSegment.freshSegment());
+ hasAvailableSegments.signalAll();
+ }
- // flush old Cfs if we're full
- long unused = unusedCapacity();
- if (unused < 0)
- {
- List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
- long spaceToReclaim = 0;
- for (CommitLogSegment segment : activeSegments)
+ // flush old Cfs if we're full
+ long unused = unusedCapacity();
+ if (unused < 0)
{
- if (segment == allocatingFrom)
- break;
- segmentsToRecycle.add(segment);
- spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
- if (spaceToReclaim + unused >= 0)
- break;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ long spaceToReclaim = 0;
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ segmentsToRecycle.add(segment);
+ spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+ if (spaceToReclaim + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
}
- flushDataFrom(segmentsToRecycle, false);
- }
- try
- {
- // wait for new work to be provided
- task = segmentManagementTasks.take();
+ try
+ {
+ // wait for new work to be provided
+ task = segmentManagementTasks.take();
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown signal; exit cleanly
+ continue;
+ }
}
- catch (InterruptedException e)
+
+ CommitLogSegment recycled = task.call();
+ if (recycled != null)
{
- // shutdown signal; exit cleanly
- continue;
+ // if the work resulted in a segment to recycle, publish it
+ availableSegments.add(recycled);
+ hasAvailableSegments.signalAll();
}
}
-
- // TODO : some error handling in case we fail on executing call (e.g. recycling)
- CommitLogSegment recycled = task.call();
- if (recycled != null)
+ catch (Throwable t)
{
- // if the work resulted in a segment to recycle, publish it
- availableSegments.add(recycled);
- hasAvailableSegments.signalAll();
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 875c9d5..41b7aa3 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -390,19 +390,6 @@ public class FileUtils
}
}
- public static void skipBytesFully(DataInput in, long bytes) throws IOException
- {
- long n = 0;
- while (n < bytes)
- {
- int m = (int) Math.min(Integer.MAX_VALUE, bytes - n);
- int skipped = in.skipBytes(m);
- if (skipped == 0)
- throw new EOFException("EOF after " + n + " bytes out of " + bytes);
- n += skipped;
- }
- }
-
public static void handleCorruptSSTable(CorruptSSTableException e)
{
if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 660e91e..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,9 +22,11 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Test;
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader
String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
}
+
+ @Test
+ public void testCommitFailurePolicy_stop()
+ {
+ File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+ try
+ {
+
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+ commitDir.setWritable(false);
+ Mutation rm = new Mutation("Keyspace1", bytes("k"));
+ rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+
+ // Adding it twice (won't change segment)
+ CommitLog.instance.add(rm);
+ Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+ Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+ Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+ Assert.assertFalse(StorageService.instance.isInitialized());
+
+ }
+ finally
+ {
+ commitDir.setWritable(true);
+ }
+ }
+
}
[6/6] git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2472701e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2472701e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2472701e
Branch: refs/heads/trunk
Commit: 2472701e119416aa391fe76eaf56112f0cecbe4f
Parents: 6477286 5c85699
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:02:44 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:02:44 2014 +0100
----------------------------------------------------------------------
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 8 ++
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../db/commitlog/AbstractCommitLogService.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 21 +++++
.../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++---------
.../org/apache/cassandra/io/util/FileUtils.java | 13 ---
.../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++
8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2472701e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2472701e/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2472701e/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java
index 7e30867,7046536..22b6e6b
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@@ -25,8 -26,8 +26,9 @@@ import java.util.concurrent.TimeUnit
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+ import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@@ -38,9 -38,8 +40,10 @@@ import org.apache.cassandra.db.commitlo
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
[2/6] git commit: Merge commit failure policy into 2.1
Posted by be...@apache.org.
Merge commit failure policy into 2.1
patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1
Branch: refs/heads/cassandra-2.1.0
Commit: da3606e180411e88774a9f24e7884147a202fea8
Parents: b8ee9da
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:00:42 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:00:42 2014 +0100
----------------------------------------------------------------------
conf/cassandra.yaml | 8 ++
.../org/apache/cassandra/config/Config.java | 8 ++
.../cassandra/config/DatabaseDescriptor.java | 10 +++
.../db/commitlog/AbstractCommitLogService.java | 4 +-
.../cassandra/db/commitlog/CommitLog.java | 21 +++++
.../db/commitlog/CommitLogSegmentManager.java | 88 +++++++++++---------
.../org/apache/cassandra/io/util/FileUtils.java | 13 ---
.../org/apache/cassandra/db/CommitLogTest.java | 31 +++++++
8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 64b5987..f1e5576 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
disk_failure_policy: stop
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+# can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but
+# continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
# Maximum size of the key cache in memory.
#
# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index c5e2b76..4d4a95b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -57,6 +57,7 @@ public class Config
public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+ public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
/* initial token in the ring */
public String initial_token;
@@ -300,6 +301,13 @@ public class Config
stop_paranoid,
}
+ public static enum CommitFailurePolicy
+ {
+ stop,
+ stop_commit,
+ ignore,
+ }
+
public static enum RequestSchedulerId
{
keyspace
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15c8c09..e8ce9df 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,6 +1192,16 @@ public class DatabaseDescriptor
return conf.disk_failure_policy;
}
+ public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+ {
+ conf.commit_failure_policy = policy;
+ }
+
+ public static Config.CommitFailurePolicy getCommitFailurePolicy()
+ {
+ return conf.commit_failure_policy;
+ }
+
public static boolean isSnapshotBeforeCompaction()
{
return conf.snapshot_before_compaction;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 94802bf..59bf691 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService
}
catch (Throwable t)
{
- logger.error("Commit log sync failed", t);
+ if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+ break;
+
// sleep for full poll-interval after an error, so we don't spam the log file
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index eaa1b3c..41c01c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,12 @@ import java.io.*;
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.DataOutputByteBuffer;
import org.apache.cassandra.metrics.CommitLogMetrics;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.PureJavaCrc32;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean
{
return allocator.getActiveSegments().size();
}
+
+ static boolean handleCommitError(String message, Throwable t)
+ {
+ switch (DatabaseDescriptor.getCommitFailurePolicy())
+ {
+ case stop:
+ StorageService.instance.stopTransports();
+ case stop_commit:
+ logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+ return false;
+ case ignore:
+ logger.error(message, t);
+ return true;
+ default:
+ throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 117b9d1..ed0a7ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Iterables;
@@ -101,56 +102,65 @@ public class CommitLogSegmentManager
{
while (run)
{
- Callable<CommitLogSegment> task = segmentManagementTasks.poll();
- if (task == null)
+ try
{
- // if we have no more work to do, check if we should create a new segment
- if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+ if (task == null)
{
- logger.debug("No segments in reserve; creating a fresh one");
- size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
- // TODO : some error handling in case we fail to create a new segment
- availableSegments.add(CommitLogSegment.freshSegment());
- hasAvailableSegments.signalAll();
- }
+ // if we have no more work to do, check if we should create a new segment
+ if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+ {
+ logger.debug("No segments in reserve; creating a fresh one");
+ size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+ // TODO : some error handling in case we fail to create a new segment
+ availableSegments.add(CommitLogSegment.freshSegment());
+ hasAvailableSegments.signalAll();
+ }
- // flush old Cfs if we're full
- long unused = unusedCapacity();
- if (unused < 0)
- {
- List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
- long spaceToReclaim = 0;
- for (CommitLogSegment segment : activeSegments)
+ // flush old Cfs if we're full
+ long unused = unusedCapacity();
+ if (unused < 0)
{
- if (segment == allocatingFrom)
- break;
- segmentsToRecycle.add(segment);
- spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
- if (spaceToReclaim + unused >= 0)
- break;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ long spaceToReclaim = 0;
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ segmentsToRecycle.add(segment);
+ spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+ if (spaceToReclaim + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
}
- flushDataFrom(segmentsToRecycle, false);
- }
- try
- {
- // wait for new work to be provided
- task = segmentManagementTasks.take();
+ try
+ {
+ // wait for new work to be provided
+ task = segmentManagementTasks.take();
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown signal; exit cleanly
+ continue;
+ }
}
- catch (InterruptedException e)
+
+ CommitLogSegment recycled = task.call();
+ if (recycled != null)
{
- // shutdown signal; exit cleanly
- continue;
+ // if the work resulted in a segment to recycle, publish it
+ availableSegments.add(recycled);
+ hasAvailableSegments.signalAll();
}
}
-
- // TODO : some error handling in case we fail on executing call (e.g. recycling)
- CommitLogSegment recycled = task.call();
- if (recycled != null)
+ catch (Throwable t)
{
- // if the work resulted in a segment to recycle, publish it
- availableSegments.add(recycled);
- hasAvailableSegments.signalAll();
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 875c9d5..41b7aa3 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -390,19 +390,6 @@ public class FileUtils
}
}
- public static void skipBytesFully(DataInput in, long bytes) throws IOException
- {
- long n = 0;
- while (n < bytes)
- {
- int m = (int) Math.min(Integer.MAX_VALUE, bytes - n);
- int skipped = in.skipBytes(m);
- if (skipped == 0)
- throw new EOFException("EOF after " + n + " bytes out of " + bytes);
- n += skipped;
- }
- }
-
public static void handleCorruptSSTable(CorruptSSTableException e)
{
if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 660e91e..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,9 +22,11 @@ package org.apache.cassandra.db;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Test;
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader
String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
}
+
+ @Test
+ public void testCommitFailurePolicy_stop()
+ {
+ File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+ try
+ {
+
+ DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+ commitDir.setWritable(false);
+ Mutation rm = new Mutation("Keyspace1", bytes("k"));
+ rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+
+ // Adding it twice (won't change segment)
+ CommitLog.instance.add(rm);
+ Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+ Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+ Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+ Assert.assertFalse(StorageService.instance.isInitialized());
+
+ }
+ finally
+ {
+ commitDir.setWritable(true);
+ }
+ }
+
}