You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/06/30 14:03:13 UTC

[1/6] git commit: Merge commit failure policy into 2.1

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 73a07daf9 -> 5c8569976
  refs/heads/cassandra-2.1.0 b8ee9da1e -> da3606e18
  refs/heads/trunk 64772865c -> 2472701e1


Merge commit failure policy into 2.1

patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1

Branch: refs/heads/cassandra-2.1
Commit: da3606e180411e88774a9f24e7884147a202fea8
Parents: b8ee9da
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:00:42 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:00:42 2014 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  8 ++
 .../org/apache/cassandra/config/Config.java     |  8 ++
 .../cassandra/config/DatabaseDescriptor.java    | 10 +++
 .../db/commitlog/AbstractCommitLogService.java  |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       | 21 +++++
 .../db/commitlog/CommitLogSegmentManager.java   | 88 +++++++++++---------
 .../org/apache/cassandra/io/util/FileUtils.java | 13 ---
 .../org/apache/cassandra/db/CommitLogTest.java  | 31 +++++++
 8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 64b5987..f1e5576 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
 disk_failure_policy: stop
 
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+#       can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but 
+#              continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
 # Maximum size of the key cache in memory.
 #
 # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index c5e2b76..4d4a95b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -57,6 +57,7 @@ public class Config
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
 
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+    public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
 
     /* initial token in the ring */
     public String initial_token;
@@ -300,6 +301,13 @@ public class Config
         stop_paranoid,
     }
 
+    public static enum CommitFailurePolicy
+    {
+        stop,
+        stop_commit,
+        ignore,
+    }
+
     public static enum RequestSchedulerId
     {
         keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15c8c09..e8ce9df 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,6 +1192,16 @@ public class DatabaseDescriptor
         return conf.disk_failure_policy;
     }
 
+    public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+    {
+        conf.commit_failure_policy = policy;
+    }
+
+    public static Config.CommitFailurePolicy getCommitFailurePolicy()
+    {
+        return conf.commit_failure_policy;
+    }
+
     public static boolean isSnapshotBeforeCompaction()
     {
         return conf.snapshot_before_compaction;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 94802bf..59bf691 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService
                     }
                     catch (Throwable t)
                     {
-                        logger.error("Commit log sync failed", t);
+                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+                            break;
+
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index eaa1b3c..41c01c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,12 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DataOutputByteBuffer;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean
     {
         return allocator.getActiveSegments().size();
     }
+
+    static boolean handleCommitError(String message, Throwable t)
+    {
+        switch (DatabaseDescriptor.getCommitFailurePolicy())
+        {
+            case stop:
+                StorageService.instance.stopTransports();
+            case stop_commit:
+                logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+                return false;
+            case ignore:
+                logger.error(message, t);
+                return true;
+            default:
+                throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 117b9d1..ed0a7ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Iterables;
@@ -101,56 +102,65 @@ public class CommitLogSegmentManager
             {
                 while (run)
                 {
-                    Callable<CommitLogSegment> task = segmentManagementTasks.poll();
-                    if (task == null)
+                    try
                     {
-                        // if we have no more work to do, check if we should create a new segment
-                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                        Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+                        if (task == null)
                         {
-                            logger.debug("No segments in reserve; creating a fresh one");
-                            size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-                            // TODO : some error handling in case we fail to create a new segment
-                            availableSegments.add(CommitLogSegment.freshSegment());
-                            hasAvailableSegments.signalAll();
-                        }
+                            // if we have no more work to do, check if we should create a new segment
+                            if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                            {
+                                logger.debug("No segments in reserve; creating a fresh one");
+                                size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+                                // TODO : some error handling in case we fail to create a new segment
+                                availableSegments.add(CommitLogSegment.freshSegment());
+                                hasAvailableSegments.signalAll();
+                            }
 
-                        // flush old Cfs if we're full
-                        long unused = unusedCapacity();
-                        if (unused < 0)
-                        {
-                            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
-                            long spaceToReclaim = 0;
-                            for (CommitLogSegment segment : activeSegments)
+                            // flush old Cfs if we're full
+                            long unused = unusedCapacity();
+                            if (unused < 0)
                             {
-                                if (segment == allocatingFrom)
-                                    break;
-                                segmentsToRecycle.add(segment);
-                                spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
-                                if (spaceToReclaim + unused >= 0)
-                                    break;
+                                List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+                                long spaceToReclaim = 0;
+                                for (CommitLogSegment segment : activeSegments)
+                                {
+                                    if (segment == allocatingFrom)
+                                        break;
+                                    segmentsToRecycle.add(segment);
+                                    spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+                                    if (spaceToReclaim + unused >= 0)
+                                        break;
+                                }
+                                flushDataFrom(segmentsToRecycle, false);
                             }
-                            flushDataFrom(segmentsToRecycle, false);
-                        }
 
-                        try
-                        {
-                            // wait for new work to be provided
-                            task = segmentManagementTasks.take();
+                            try
+                            {
+                                // wait for new work to be provided
+                                task = segmentManagementTasks.take();
+                            }
+                            catch (InterruptedException e)
+                            {
+                                // shutdown signal; exit cleanly
+                                continue;
+                            }
                         }
-                        catch (InterruptedException e)
+
+                        CommitLogSegment recycled = task.call();
+                        if (recycled != null)
                         {
-                            // shutdown signal; exit cleanly
-                            continue;
+                            // if the work resulted in a segment to recycle, publish it
+                            availableSegments.add(recycled);
+                            hasAvailableSegments.signalAll();
                         }
                     }
-
-                    // TODO : some error handling in case we fail on executing call (e.g. recycling)
-                    CommitLogSegment recycled = task.call();
-                    if (recycled != null)
+                    catch (Throwable t)
                     {
-                        // if the work resulted in a segment to recycle, publish it
-                        availableSegments.add(recycled);
-                        hasAvailableSegments.signalAll();
+                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+                            return;
+                        // sleep some arbitrary period to avoid spamming CL
+                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 875c9d5..41b7aa3 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -390,19 +390,6 @@ public class FileUtils
         }
     }
 
-    public static void skipBytesFully(DataInput in, long bytes) throws IOException
-    {
-        long n = 0;
-        while (n < bytes)
-        {
-            int m = (int) Math.min(Integer.MAX_VALUE, bytes - n);
-            int skipped = in.skipBytes(m);
-            if (skipped == 0)
-                throw new EOFException("EOF after " + n + " bytes out of " + bytes);
-            n += skipped;
-        }
-    }
-
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
         if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 660e91e..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,9 +22,11 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader
         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
         Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
     }
+
+    @Test
+    public void testCommitFailurePolicy_stop()
+    {
+        File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+        try
+        {
+
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+            commitDir.setWritable(false);
+            Mutation rm = new Mutation("Keyspace1", bytes("k"));
+            rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+
+            // Adding it twice (won't change segment)
+            CommitLog.instance.add(rm);
+            Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+            Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+            Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+            Assert.assertFalse(StorageService.instance.isInitialized());
+
+        }
+        finally
+        {
+            commitDir.setWritable(true);
+        }
+    }
+
 }


[4/6] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1

Posted by be...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c856997
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c856997
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c856997

Branch: refs/heads/trunk
Commit: 5c8569976e4ff8892860191644a82a55c4db2253
Parents: 73a07da da3606e
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:02:34 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:02:34 2014 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  8 ++
 .../org/apache/cassandra/config/Config.java     |  8 ++
 .../cassandra/config/DatabaseDescriptor.java    | 10 +++
 .../db/commitlog/AbstractCommitLogService.java  |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       | 21 +++++
 .../db/commitlog/CommitLogSegmentManager.java   | 88 +++++++++++---------
 .../org/apache/cassandra/io/util/FileUtils.java | 13 ---
 .../org/apache/cassandra/db/CommitLogTest.java  | 31 +++++++
 8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------



[5/6] git commit: Merge branch 'cassandra-2.1.0' into cassandra-2.1

Posted by be...@apache.org.
Merge branch 'cassandra-2.1.0' into cassandra-2.1


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5c856997
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5c856997
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5c856997

Branch: refs/heads/cassandra-2.1
Commit: 5c8569976e4ff8892860191644a82a55c4db2253
Parents: 73a07da da3606e
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:02:34 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:02:34 2014 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  8 ++
 .../org/apache/cassandra/config/Config.java     |  8 ++
 .../cassandra/config/DatabaseDescriptor.java    | 10 +++
 .../db/commitlog/AbstractCommitLogService.java  |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       | 21 +++++
 .../db/commitlog/CommitLogSegmentManager.java   | 88 +++++++++++---------
 .../org/apache/cassandra/io/util/FileUtils.java | 13 ---
 .../org/apache/cassandra/db/CommitLogTest.java  | 31 +++++++
 8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------



[3/6] git commit: Merge commit failure policy into 2.1

Posted by be...@apache.org.
Merge commit failure policy into 2.1

patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1

Branch: refs/heads/trunk
Commit: da3606e180411e88774a9f24e7884147a202fea8
Parents: b8ee9da
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:00:42 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:00:42 2014 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  8 ++
 .../org/apache/cassandra/config/Config.java     |  8 ++
 .../cassandra/config/DatabaseDescriptor.java    | 10 +++
 .../db/commitlog/AbstractCommitLogService.java  |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       | 21 +++++
 .../db/commitlog/CommitLogSegmentManager.java   | 88 +++++++++++---------
 .../org/apache/cassandra/io/util/FileUtils.java | 13 ---
 .../org/apache/cassandra/db/CommitLogTest.java  | 31 +++++++
 8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 64b5987..f1e5576 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
 disk_failure_policy: stop
 
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+#       can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but 
+#              continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
 # Maximum size of the key cache in memory.
 #
 # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index c5e2b76..4d4a95b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -57,6 +57,7 @@ public class Config
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
 
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+    public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
 
     /* initial token in the ring */
     public String initial_token;
@@ -300,6 +301,13 @@ public class Config
         stop_paranoid,
     }
 
+    public static enum CommitFailurePolicy
+    {
+        stop,
+        stop_commit,
+        ignore,
+    }
+
     public static enum RequestSchedulerId
     {
         keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15c8c09..e8ce9df 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,6 +1192,16 @@ public class DatabaseDescriptor
         return conf.disk_failure_policy;
     }
 
+    public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+    {
+        conf.commit_failure_policy = policy;
+    }
+
+    public static Config.CommitFailurePolicy getCommitFailurePolicy()
+    {
+        return conf.commit_failure_policy;
+    }
+
     public static boolean isSnapshotBeforeCompaction()
     {
         return conf.snapshot_before_compaction;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 94802bf..59bf691 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService
                     }
                     catch (Throwable t)
                     {
-                        logger.error("Commit log sync failed", t);
+                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+                            break;
+
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index eaa1b3c..41c01c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,12 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DataOutputByteBuffer;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean
     {
         return allocator.getActiveSegments().size();
     }
+
+    static boolean handleCommitError(String message, Throwable t)
+    {
+        switch (DatabaseDescriptor.getCommitFailurePolicy())
+        {
+            case stop:
+                StorageService.instance.stopTransports();
+            case stop_commit:
+                logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+                return false;
+            case ignore:
+                logger.error(message, t);
+                return true;
+            default:
+                throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 117b9d1..ed0a7ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Iterables;
@@ -101,56 +102,65 @@ public class CommitLogSegmentManager
             {
                 while (run)
                 {
-                    Callable<CommitLogSegment> task = segmentManagementTasks.poll();
-                    if (task == null)
+                    try
                     {
-                        // if we have no more work to do, check if we should create a new segment
-                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                        Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+                        if (task == null)
                         {
-                            logger.debug("No segments in reserve; creating a fresh one");
-                            size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-                            // TODO : some error handling in case we fail to create a new segment
-                            availableSegments.add(CommitLogSegment.freshSegment());
-                            hasAvailableSegments.signalAll();
-                        }
+                            // if we have no more work to do, check if we should create a new segment
+                            if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                            {
+                                logger.debug("No segments in reserve; creating a fresh one");
+                                size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+                                // TODO : some error handling in case we fail to create a new segment
+                                availableSegments.add(CommitLogSegment.freshSegment());
+                                hasAvailableSegments.signalAll();
+                            }
 
-                        // flush old Cfs if we're full
-                        long unused = unusedCapacity();
-                        if (unused < 0)
-                        {
-                            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
-                            long spaceToReclaim = 0;
-                            for (CommitLogSegment segment : activeSegments)
+                            // flush old Cfs if we're full
+                            long unused = unusedCapacity();
+                            if (unused < 0)
                             {
-                                if (segment == allocatingFrom)
-                                    break;
-                                segmentsToRecycle.add(segment);
-                                spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
-                                if (spaceToReclaim + unused >= 0)
-                                    break;
+                                List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+                                long spaceToReclaim = 0;
+                                for (CommitLogSegment segment : activeSegments)
+                                {
+                                    if (segment == allocatingFrom)
+                                        break;
+                                    segmentsToRecycle.add(segment);
+                                    spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+                                    if (spaceToReclaim + unused >= 0)
+                                        break;
+                                }
+                                flushDataFrom(segmentsToRecycle, false);
                             }
-                            flushDataFrom(segmentsToRecycle, false);
-                        }
 
-                        try
-                        {
-                            // wait for new work to be provided
-                            task = segmentManagementTasks.take();
+                            try
+                            {
+                                // wait for new work to be provided
+                                task = segmentManagementTasks.take();
+                            }
+                            catch (InterruptedException e)
+                            {
+                                // shutdown signal; exit cleanly
+                                continue;
+                            }
                         }
-                        catch (InterruptedException e)
+
+                        CommitLogSegment recycled = task.call();
+                        if (recycled != null)
                         {
-                            // shutdown signal; exit cleanly
-                            continue;
+                            // if the work resulted in a segment to recycle, publish it
+                            availableSegments.add(recycled);
+                            hasAvailableSegments.signalAll();
                         }
                     }
-
-                    // TODO : some error handling in case we fail on executing call (e.g. recycling)
-                    CommitLogSegment recycled = task.call();
-                    if (recycled != null)
+                    catch (Throwable t)
                     {
-                        // if the work resulted in a segment to recycle, publish it
-                        availableSegments.add(recycled);
-                        hasAvailableSegments.signalAll();
+                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+                            return;
+                        // sleep some arbitrary period to avoid spamming CL
+                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 875c9d5..41b7aa3 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -390,19 +390,6 @@ public class FileUtils
         }
     }
 
-    public static void skipBytesFully(DataInput in, long bytes) throws IOException
-    {
-        long n = 0;
-        while (n < bytes)
-        {
-            int m = (int) Math.min(Integer.MAX_VALUE, bytes - n);
-            int skipped = in.skipBytes(m);
-            if (skipped == 0)
-                throw new EOFException("EOF after " + n + " bytes out of " + bytes);
-            n += skipped;
-        }
-    }
-
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
         if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 660e91e..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,9 +22,11 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader
         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
         Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
     }
+
+    @Test
+    public void testCommitFailurePolicy_stop()
+    {
+        File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+        try
+        {
+
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+            commitDir.setWritable(false);
+            Mutation rm = new Mutation("Keyspace1", bytes("k"));
+            rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+
+            // Adding it twice (won't change segment)
+            CommitLog.instance.add(rm);
+            Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+            Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+            Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+            Assert.assertFalse(StorageService.instance.isInitialized());
+
+        }
+        finally
+        {
+            commitDir.setWritable(true);
+        }
+    }
+
 }


[6/6] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2472701e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2472701e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2472701e

Branch: refs/heads/trunk
Commit: 2472701e119416aa391fe76eaf56112f0cecbe4f
Parents: 6477286 5c85699
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:02:44 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:02:44 2014 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  8 ++
 .../org/apache/cassandra/config/Config.java     |  8 ++
 .../cassandra/config/DatabaseDescriptor.java    | 10 +++
 .../db/commitlog/AbstractCommitLogService.java  |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       | 21 +++++
 .../db/commitlog/CommitLogSegmentManager.java   | 88 +++++++++++---------
 .../org/apache/cassandra/io/util/FileUtils.java | 13 ---
 .../org/apache/cassandra/db/CommitLogTest.java  | 31 +++++++
 8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2472701e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2472701e/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2472701e/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java
index 7e30867,7046536..22b6e6b
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@@ -25,8 -26,8 +26,9 @@@ import java.util.concurrent.TimeUnit
  import java.util.zip.CRC32;
  import java.util.zip.Checksum;
  
+ import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Assert;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
@@@ -38,9 -38,8 +40,10 @@@ import org.apache.cassandra.db.commitlo
  import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
  import org.apache.cassandra.db.commitlog.CommitLogSegment;
  import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.service.StorageService;
  
  import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
  


[2/6] git commit: Merge commit failure policy into 2.1

Posted by be...@apache.org.
Merge commit failure policy into 2.1

patch by Benedict Elliott Smith; reviewed by Marcus Eriksson for CASSANDRA-7429


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da3606e1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da3606e1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da3606e1

Branch: refs/heads/cassandra-2.1.0
Commit: da3606e180411e88774a9f24e7884147a202fea8
Parents: b8ee9da
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Mon Jun 30 13:00:42 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Mon Jun 30 13:00:42 2014 +0100

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  8 ++
 .../org/apache/cassandra/config/Config.java     |  8 ++
 .../cassandra/config/DatabaseDescriptor.java    | 10 +++
 .../db/commitlog/AbstractCommitLogService.java  |  4 +-
 .../cassandra/db/commitlog/CommitLog.java       | 21 +++++
 .../db/commitlog/CommitLogSegmentManager.java   | 88 +++++++++++---------
 .../org/apache/cassandra/io/util/FileUtils.java | 13 ---
 .../org/apache/cassandra/db/CommitLogTest.java  | 31 +++++++
 8 files changed, 130 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 64b5987..f1e5576 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -113,6 +113,14 @@ partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 # ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
 disk_failure_policy: stop
 
+# policy for commit disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+#       can still be inspected via JMX.
+# stop_commit: shutdown the commit log, letting writes collect but 
+#              continuing to service reads, as in pre-2.0.5 Cassandra
+# ignore: ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
 # Maximum size of the key cache in memory.
 #
 # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index c5e2b76..4d4a95b 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -57,6 +57,7 @@ public class Config
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
 
     public DiskFailurePolicy disk_failure_policy = DiskFailurePolicy.ignore;
+    public CommitFailurePolicy commit_failure_policy = CommitFailurePolicy.stop;
 
     /* initial token in the ring */
     public String initial_token;
@@ -300,6 +301,13 @@ public class Config
         stop_paranoid,
     }
 
+    public static enum CommitFailurePolicy
+    {
+        stop,
+        stop_commit,
+        ignore,
+    }
+
     public static enum RequestSchedulerId
     {
         keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 15c8c09..e8ce9df 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1192,6 +1192,16 @@ public class DatabaseDescriptor
         return conf.disk_failure_policy;
     }
 
+    public static void setCommitFailurePolicy(Config.CommitFailurePolicy policy)
+    {
+        conf.commit_failure_policy = policy;
+    }
+
+    public static Config.CommitFailurePolicy getCommitFailurePolicy()
+    {
+        return conf.commit_failure_policy;
+    }
+
     public static boolean isSnapshotBeforeCompaction()
     {
         return conf.snapshot_before_compaction;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 94802bf..59bf691 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -122,7 +122,9 @@ public abstract class AbstractCommitLogService
                     }
                     catch (Throwable t)
                     {
-                        logger.error("Commit log sync failed", t);
+                        if (!CommitLog.handleCommitError("Failed to persist commits to disk", t))
+                            break;
+
                         // sleep for full poll-interval after an error, so we don't spam the log file
                         try
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index eaa1b3c..41c01c3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,10 +21,12 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +37,7 @@ import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.DataOutputByteBuffer;
 import org.apache.cassandra.metrics.CommitLogMetrics;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
@@ -343,4 +346,22 @@ public class CommitLog implements CommitLogMBean
     {
         return allocator.getActiveSegments().size();
     }
+
+    static boolean handleCommitError(String message, Throwable t)
+    {
+        switch (DatabaseDescriptor.getCommitFailurePolicy())
+        {
+            case stop:
+                StorageService.instance.stopTransports();
+            case stop_commit:
+                logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t);
+                return false;
+            case ignore:
+                logger.error(message, t);
+                return true;
+            default:
+                throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 117b9d1..ed0a7ff 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Iterables;
@@ -101,56 +102,65 @@ public class CommitLogSegmentManager
             {
                 while (run)
                 {
-                    Callable<CommitLogSegment> task = segmentManagementTasks.poll();
-                    if (task == null)
+                    try
                     {
-                        // if we have no more work to do, check if we should create a new segment
-                        if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                        Callable<CommitLogSegment> task = segmentManagementTasks.poll();
+                        if (task == null)
                         {
-                            logger.debug("No segments in reserve; creating a fresh one");
-                            size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
-                            // TODO : some error handling in case we fail to create a new segment
-                            availableSegments.add(CommitLogSegment.freshSegment());
-                            hasAvailableSegments.signalAll();
-                        }
+                            // if we have no more work to do, check if we should create a new segment
+                            if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments))
+                            {
+                                logger.debug("No segments in reserve; creating a fresh one");
+                                size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
+                                // TODO : some error handling in case we fail to create a new segment
+                                availableSegments.add(CommitLogSegment.freshSegment());
+                                hasAvailableSegments.signalAll();
+                            }
 
-                        // flush old Cfs if we're full
-                        long unused = unusedCapacity();
-                        if (unused < 0)
-                        {
-                            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
-                            long spaceToReclaim = 0;
-                            for (CommitLogSegment segment : activeSegments)
+                            // flush old Cfs if we're full
+                            long unused = unusedCapacity();
+                            if (unused < 0)
                             {
-                                if (segment == allocatingFrom)
-                                    break;
-                                segmentsToRecycle.add(segment);
-                                spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
-                                if (spaceToReclaim + unused >= 0)
-                                    break;
+                                List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+                                long spaceToReclaim = 0;
+                                for (CommitLogSegment segment : activeSegments)
+                                {
+                                    if (segment == allocatingFrom)
+                                        break;
+                                    segmentsToRecycle.add(segment);
+                                    spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize();
+                                    if (spaceToReclaim + unused >= 0)
+                                        break;
+                                }
+                                flushDataFrom(segmentsToRecycle, false);
                             }
-                            flushDataFrom(segmentsToRecycle, false);
-                        }
 
-                        try
-                        {
-                            // wait for new work to be provided
-                            task = segmentManagementTasks.take();
+                            try
+                            {
+                                // wait for new work to be provided
+                                task = segmentManagementTasks.take();
+                            }
+                            catch (InterruptedException e)
+                            {
+                                // shutdown signal; exit cleanly
+                                continue;
+                            }
                         }
-                        catch (InterruptedException e)
+
+                        CommitLogSegment recycled = task.call();
+                        if (recycled != null)
                         {
-                            // shutdown signal; exit cleanly
-                            continue;
+                            // if the work resulted in a segment to recycle, publish it
+                            availableSegments.add(recycled);
+                            hasAvailableSegments.signalAll();
                         }
                     }
-
-                    // TODO : some error handling in case we fail on executing call (e.g. recycling)
-                    CommitLogSegment recycled = task.call();
-                    if (recycled != null)
+                    catch (Throwable t)
                     {
-                        // if the work resulted in a segment to recycle, publish it
-                        availableSegments.add(recycled);
-                        hasAvailableSegments.signalAll();
+                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+                            return;
+                        // sleep some arbitrary period to avoid spamming CL
+                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 875c9d5..41b7aa3 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -390,19 +390,6 @@ public class FileUtils
         }
     }
 
-    public static void skipBytesFully(DataInput in, long bytes) throws IOException
-    {
-        long n = 0;
-        while (n < bytes)
-        {
-            int m = (int) Math.min(Integer.MAX_VALUE, bytes - n);
-            int skipped = in.skipBytes(m);
-            if (skipped == 0)
-                throw new EOFException("EOF after " + n + " bytes out of " + bytes);
-            n += skipped;
-        }
-    }
-
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
         if (DatabaseDescriptor.getDiskFailurePolicy() == Config.DiskFailurePolicy.stop_paranoid)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/da3606e1/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 660e91e..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -22,9 +22,11 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.CRC32;
 import java.util.zip.Checksum;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -37,6 +39,7 @@ import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
@@ -264,4 +267,32 @@ public class CommitLogTest extends SchemaLoader
         String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
         Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
     }
+
+    @Test
+    public void testCommitFailurePolicy_stop()
+    {
+        File commitDir = new File(DatabaseDescriptor.getCommitLogLocation());
+
+        try
+        {
+
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+            commitDir.setWritable(false);
+            Mutation rm = new Mutation("Keyspace1", bytes("k"));
+            rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
+
+            // Adding it twice (won't change segment)
+            CommitLog.instance.add(rm);
+            Uninterruptibles.sleepUninterruptibly((int) DatabaseDescriptor.getCommitLogSyncBatchWindow(), TimeUnit.MILLISECONDS);
+            Assert.assertFalse(StorageService.instance.isRPCServerRunning());
+            Assert.assertFalse(StorageService.instance.isNativeTransportRunning());
+            Assert.assertFalse(StorageService.instance.isInitialized());
+
+        }
+        finally
+        {
+            commitDir.setWritable(true);
+        }
+    }
+
 }