You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2015/08/19 03:16:58 UTC

[7/8] cassandra git commit: Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush. Reviewed by aweisberg for CASSANDRA-9749

Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush.
Reviewed by aweisberg for CASSANDRA-9749


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d12d2d49
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d12d2d49
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d12d2d49

Branch: refs/heads/cassandra-3.0
Commit: d12d2d496540c698f30e9b528b66e8f6636842d3
Parents: 7eed4b6
Author: Branimir Lambov <br...@datastax.com>
Authored: Thu Jul 30 20:59:16 2015 +0300
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Aug 18 20:16:31 2015 -0500

----------------------------------------------------------------------
 .../cassandra/db/commitlog/CommitLog.java       |   4 +-
 .../db/commitlog/CommitLogReplayer.java         | 117 +++-
 .../db/commitlog/CommitLogSegmentManager.java   |   8 +-
 .../CommitLog-5-1438186885380.log               | Bin 0 -> 839051 bytes
 .../legacy-commitlog/2.2-lz4-bitrot/hash.txt    |   6 +
 .../CommitLog-5-1438186885380.log               | Bin 0 -> 839051 bytes
 .../legacy-commitlog/2.2-lz4-bitrot2/hash.txt   |   6 +
 .../CommitLog-5-1438186885380.log               | Bin 0 -> 839001 bytes
 .../legacy-commitlog/2.2-lz4-truncated/hash.txt |   5 +
 .../db/CommitLogFailurePolicyTest.java          | 141 -----
 .../org/apache/cassandra/db/CommitLogTest.java  | 467 ---------------
 .../commitlog/CommitLogFailurePolicyTest.java   | 141 +++++
 .../cassandra/db/commitlog/CommitLogTest.java   | 578 +++++++++++++++++++
 .../db/commitlog/CommitLogUpgradeTest.java      |  65 +++
 14 files changed, 899 insertions(+), 639 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 8d74677..ff27225 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -193,7 +193,9 @@ public class CommitLog implements CommitLogMBean
      */
     public void recover(String path) throws IOException
     {
-        recover(new File(path));
+        CommitLogReplayer recovery = CommitLogReplayer.construct(this);
+        recovery.recover(new File(path), false);
+        recovery.blockForWrites();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 358661a..93c3026 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -64,6 +64,7 @@ import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
 public class CommitLogReplayer
 {
+    static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors";
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024);
     private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
@@ -144,14 +145,15 @@ public class CommitLogReplayer
 
     public void recover(File[] clogs) throws IOException
     {
-        for (final File file : clogs)
-            recover(file);
+        int i;
+        for (i = 0; i < clogs.length; ++i)
+            recover(clogs[i], i + 1 == clogs.length);
     }
 
     public int blockForWrites()
     {
         for (Map.Entry<UUID, AtomicInteger> entry : invalidMutations.entrySet())
-            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
+            logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey()));
 
         // wait for all the writes to finish on the mutation stage
         FBUtilities.waitOnFutures(futures);
@@ -165,7 +167,7 @@ public class CommitLogReplayer
         return replayedCount.get();
     }
 
-    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException
+    private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException
     {
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
@@ -183,13 +185,17 @@ public class CommitLogReplayer
         {
             if (end != 0 || filecrc != 0)
             {
-                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+                handleReplayError(false,
+                                  "Encountered bad header at position %d of commit log %s, with invalid CRC. " +
+                                  "The end of segment marker should be zero.",
+                                  offset, reader.getPath());
             }
             return -1;
         }
         else if (end < offset || end > reader.length())
         {
-            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+            handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC",
+                              offset, reader.getPath());
             return -1;
         }
         return end;
@@ -270,8 +276,7 @@ public class CommitLogReplayer
         }
     }
 
-    @SuppressWarnings("resource")
-    public void recover(File file) throws IOException
+    public void recover(File file, boolean tolerateTruncation) throws IOException
     {
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
         RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
@@ -283,7 +288,7 @@ public class CommitLogReplayer
                     return;
                 if (globalPosition.segment == desc.id)
                     reader.seek(globalPosition.position);
-                replaySyncSection(reader, (int) reader.getPositionLimit(), desc);
+                replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation);
                 return;
             }
 
@@ -297,10 +302,15 @@ public class CommitLogReplayer
                 desc = null;
             }
             if (desc == null) {
-                logger.warn("Could not read commit log descriptor in file {}", file);
+                handleReplayError(false, "Could not read commit log descriptor in file %s", file);
                 return;
             }
-            assert segmentId == desc.id;
+            if (segmentId != desc.id)
+            {
+                handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file);
+                // continue processing if ignored.
+            }
+
             if (logAndCheckIfShouldSkip(file, desc))
                 return;
 
@@ -313,7 +323,7 @@ public class CommitLogReplayer
                 }
                 catch (ConfigurationException e)
                 {
-                    logger.warn("Unknown compression: {}", e.getMessage());
+                    handleReplayError(false, "Unknown compression: %s", e.getMessage());
                     return;
                 }
             }
@@ -322,7 +332,7 @@ public class CommitLogReplayer
             int end = (int) reader.getFilePointer();
             int replayEnd = end;
 
-            while ((end = readSyncMarker(desc, end, reader)) >= 0)
+            while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0)
             {
                 int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE;
 
@@ -343,11 +353,17 @@ public class CommitLogReplayer
                     continue;
 
                 FileDataInput sectionReader = reader;
+                String errorContext = desc.fileName();
+                // In the uncompressed case the last non-fully-flushed section can be anywhere in the file.
+                boolean tolerateErrorsInSection = tolerateTruncation;
                 if (compressor != null)
                 {
+                    // In the compressed case we know if this is the last section.
+                    tolerateErrorsInSection &= end == reader.length() || end < 0;
+
+                    int start = (int) reader.getFilePointer();
                     try
                     {
-                        int start = (int) reader.getFilePointer();
                         int compressedLength = end - start;
                         if (logger.isDebugEnabled())
                             logger.trace("Decompressing {} between replay positions {} and {}",
@@ -362,15 +378,18 @@ public class CommitLogReplayer
                             uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
                         compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
                         sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
+                        errorContext = "compressed section at " + start + " in " + errorContext;
                     }
-                    catch (IOException e)
+                    catch (IOException | ArrayIndexOutOfBoundsException e)
                     {
-                        logger.error("Unexpected exception decompressing section {}", e);
+                        handleReplayError(tolerateErrorsInSection,
+                                          "Unexpected exception decompressing section at %d: %s",
+                                          start, e);
                         continue;
                     }
                 }
 
-                if (!replaySyncSection(sectionReader, replayEnd, desc))
+                if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
                     break;
             }
         }
@@ -402,13 +421,14 @@ public class CommitLogReplayer
      *
      * @return Whether replay should continue with the next section.
      */
-    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException
+    private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException
     {
          /* read the logs populate Mutation and apply */
         while (reader.getFilePointer() < end && !reader.isEOF())
         {
+            long mutationStart = reader.getFilePointer();
             if (logger.isDebugEnabled())
-                logger.trace("Reading mutation at {}", reader.getFilePointer());
+                logger.trace("Reading mutation at {}", mutationStart);
 
             long claimedCRC32;
             int serializedSize;
@@ -427,7 +447,12 @@ public class CommitLogReplayer
                 // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
                 // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                 if (serializedSize < 10)
+                {
+                    handleReplayError(tolerateErrors,
+                                      "Invalid mutation size %d at %d in %s",
+                                      serializedSize, mutationStart, errorContext);
                     return false;
+                }
 
                 long claimedSizeChecksum;
                 if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -441,7 +466,12 @@ public class CommitLogReplayer
                     updateChecksumInt(checksum, serializedSize);
 
                 if (checksum.getValue() != claimedSizeChecksum)
+                {
+                    handleReplayError(tolerateErrors,
+                                      "Mutation size checksum failure at %d in %s",
+                                      mutationStart, errorContext);
                     return false;
+                }
                 // ok.
 
                 if (serializedSize > buffer.length)
@@ -454,14 +484,18 @@ public class CommitLogReplayer
             }
             catch (EOFException eof)
             {
+                handleReplayError(tolerateErrors,
+                                  "Unexpected end of segment",
+                                  mutationStart, errorContext);
                 return false; // last CL entry didn't get completely written. that's ok.
             }
 
             checksum.update(buffer, 0, serializedSize);
             if (claimedCRC32 != checksum.getValue())
             {
-                // this entry must not have been fsynced. probably the rest is bad too,
-                // but just in case there is no harm in trying them (since we still read on an entry boundary)
+                handleReplayError(tolerateErrors,
+                                  "Mutation checksum failure at %d in %s",
+                                  mutationStart, errorContext);
                 continue;
             }
             replayMutation(buffer, serializedSize, reader.getFilePointer(), desc);
@@ -510,9 +544,13 @@ public class CommitLogReplayer
                 out.write(inputBuffer, 0, size);
             }
 
-            String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
-                                      f.getAbsolutePath());
-            logger.error(st, t);
+            // Checksum passed so this error can't be permissible.
+            handleReplayError(false,
+                              "Unexpected error deserializing mutation; saved to %s.  " +
+                              "This may be caused by replaying a mutation against a table with the same name but incompatible schema.  " +
+                              "Exception follows: %s",
+                              f.getAbsolutePath(),
+                              t);
             return;
         }
 
@@ -580,4 +618,35 @@ public class CommitLogReplayer
         }
         return false;
     }
+
+    static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException
+    {
+        String msg = String.format(message, messageArgs);
+        IOException e = new CommitLogReplayException(msg);
+        if (permissible)
+            logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e);
+        else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY))
+            logger.error("Ignoring commit log replay error", e);
+        else if (!CommitLog.handleCommitError("Failed commit log replay", e))
+        {
+            logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " +
+                         "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " +
+                         "on the command line");
+            throw e;
+        }
+    }
+
+    @SuppressWarnings("serial")
+    public static class CommitLogReplayException extends IOException
+    {
+        public CommitLogReplayException(String message, Throwable cause)
+        {
+            super(message, cause);
+        }
+
+        public CommitLogReplayException(String message)
+        {
+            super(message);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 19b850f..9849350 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -34,10 +34,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.*;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,14 +94,12 @@ public class CommitLogSegmentManager
     private volatile boolean run = true;
     private final CommitLog commitLog;
 
-    @VisibleForTesting
-    public CommitLogSegmentManager(final CommitLog commitLog)
+    CommitLogSegmentManager(final CommitLog commitLog)
     {
         this.commitLog = commitLog;
     }
 
-    @VisibleForTesting
-    public void start()
+    void start()
     {
         // The run loop for the manager thread
         Runnable runnable = new WrappedRunnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..d248d59
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
new file mode 100644
index 0000000..c4d8fe7
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt
@@ -0,0 +1,6 @@
+#CommitLog bitrot test, version 2.2.0-SNAPSHOT
+#This is a copy of 2.2-lz4 with some overwritten bytes.
+#Replaying this should result in an error which can be overridden.
+cells=6051
+hash=-170208326
+cfid=dc32ce20-360d-11e5-826c-afadad37221d

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..083d65c
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
new file mode 100644
index 0000000..c49dda0
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt
@@ -0,0 +1,6 @@
+#CommitLog upgrade test, version 2.2.0-SNAPSHOT
+#This is a copy of 2.2-lz4 with some overwritten bytes.
+#Replaying this should result in an error which can be overridden.
+cells=6037
+hash=-1312748407
+cfid=dc32ce20-360d-11e5-826c-afadad37221d

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log
new file mode 100644
index 0000000..939d408
Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
----------------------------------------------------------------------
diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
new file mode 100644
index 0000000..ce7f600
--- /dev/null
+++ b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt
@@ -0,0 +1,5 @@
+#Truncated CommitLog test.
+#This is a copy of 2.2-lz4 with the last 50 bytes deleted.
+cells=6037
+hash=-889057729
+cfid=dc32ce20-360d-11e5-826c-afadad37221d

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
deleted file mode 100644
index 0ecab3c..0000000
--- a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.cassandra.db;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-public class CommitLogFailurePolicyTest
-{
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
-    }
-
-    @Test
-    public void testCommitFailurePolicy_stop() throws ConfigurationException
-    {
-        CassandraDaemon daemon = new CassandraDaemon();
-        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
-        StorageService.instance.registerDaemon(daemon);
-
-        // Need storage service active so stop policy can shutdown gossip
-        StorageService.instance.initServer();
-        Assert.assertTrue(Gossiper.instance.isEnabled());
-
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
-            CommitLog.handleCommitError("Test stop error", new Throwable());
-            Assert.assertFalse(Gossiper.instance.isEnabled());
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-        }
-    }
-
-    @Test
-    public void testCommitFailurePolicy_die()
-    {
-        CassandraDaemon daemon = new CassandraDaemon();
-        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
-        StorageService.instance.registerDaemon(daemon);
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
-            CommitLog.handleCommitError("Testing die policy", new Throwable());
-            Assert.assertTrue(killerForTests.wasKilled());
-            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-
-    @Test
-    public void testCommitFailurePolicy_ignore_beforeStartup()
-    {
-        //startup was not completed successfuly (since method completeSetup() was not called)
-        CassandraDaemon daemon = new CassandraDaemon();
-        StorageService.instance.registerDaemon(daemon);
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
-            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
-            //even though policy is ignore, JVM must die because Daemon has not finished initializing
-            Assert.assertTrue(killerForTests.wasKilled());
-            Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-
-    @Test
-    public void testCommitFailurePolicy_ignore_afterStartup() throws Exception
-    {
-        CassandraDaemon daemon = new CassandraDaemon();
-        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
-        StorageService.instance.registerDaemon(daemon);
-
-        KillerForTests killerForTests = new KillerForTests();
-        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
-        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
-        try
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
-            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
-            //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup
-            Assert.assertFalse(killerForTests.wasKilled());
-        }
-        finally
-        {
-            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
-            JVMStabilityInspector.replaceKiller(originalKiller);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
deleted file mode 100644
index 6db29a8..0000000
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ /dev/null
@@ -1,467 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.db;
-
-import static junit.framework.Assert.assertTrue;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLogSegmentManager;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.vint.VIntCoding;
-import org.apache.cassandra.service.CassandraDaemon;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.KillerForTests;
-
-public class CommitLogTest
-{
-    private static final Logger logger = LoggerFactory.getLogger(CommitLogTest.class);
-
-    private static final String KEYSPACE1 = "CommitLogTest";
-    private static final String KEYSPACE2 = "CommitLogTestNonDurable";
-    private static final String STANDARD1 = "Standard1";
-    private static final String STANDARD2 = "Standard2";
-
-    @BeforeClass
-    public static void defineSchema() throws ConfigurationException
-    {
-        SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1,
-                                    KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
-        SchemaLoader.createKeyspace(KEYSPACE2,
-                                    KeyspaceParams.simpleTransient(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
-        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
-        CompactionManager.instance.disableAutoCompaction();
-    }
-
-    @Test
-    public void testRecoveryWithEmptyLog() throws Exception
-    {
-        CommitLog.instance.recover(new File[]{ tmpFile() });
-    }
-
-    @Test
-    public void testRecoveryWithShortLog() throws Exception
-    {
-        // force EOF while reading log
-        testRecoveryWithBadSizeArgument(100, 10);
-    }
-
-    @Test
-    public void testRecoveryWithShortSize() throws Exception
-    {
-        testRecovery(new byte[2]);
-    }
-
-    @Test
-    public void testRecoveryWithShortCheckSum() throws Exception
-    {
-        testRecovery(new byte[6]);
-    }
-
-    @Test
-    public void testRecoveryWithGarbageLog() throws Exception
-    {
-        byte[] garbage = new byte[100];
-        (new java.util.Random()).nextBytes(garbage);
-        testRecovery(garbage);
-    }
-
-    @Test
-    public void testRecoveryWithBadSizeChecksum() throws Exception
-    {
-        Checksum checksum = new CRC32();
-        checksum.update(100);
-        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
-    }
-
-    @Test
-    public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
-    {
-        // many different combinations of 4 bytes (garbage) will be read as zero by readInt()
-        testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
-    }
-
-    @Test
-    public void testRecoveryWithNegativeSizeArgument() throws Exception
-    {
-        // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
-        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
-    }
-
-    @Test
-    public void testDontDeleteIfDirty() throws Exception
-    {
-        CommitLog.instance.resetUnsafe(true);
-        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
-
-        // Roughly 32 MB mutation
-        Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
-                     .clustering("bytes")
-                     .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
-                     .build();
-
-        // Adding it 5 times
-        CommitLog.instance.add(m);
-        CommitLog.instance.add(m);
-        CommitLog.instance.add(m);
-        CommitLog.instance.add(m);
-        CommitLog.instance.add(m);
-
-        // Adding new mutation on another CF
-        Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
-                      .clustering("bytes")
-                      .add("val", ByteBuffer.allocate(4))
-                      .build();
-        CommitLog.instance.add(m2);
-
-        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
-
-        UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
-
-        // Assert we still have both our segment
-        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
-    }
-
-    @Test
-    public void testDeleteIfNotDirty() throws Exception
-    {
-        DatabaseDescriptor.getCommitLogSegmentSize();
-        CommitLog.instance.resetUnsafe(true);
-        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
-
-        // Roughly 32 MB mutation
-        Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
-                      .clustering("bytes")
-                      .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
-                      .build();
-
-        // Adding it twice (won't change segment)
-        CommitLog.instance.add(rm);
-        CommitLog.instance.add(rm);
-
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-
-        // "Flush": this won't delete anything
-        UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.sync(true);
-        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
-
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-
-        // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
-        Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
-                       .clustering("bytes")
-                       .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200))
-                       .build();
-        CommitLog.instance.add(rm2);
-        // also forces a new segment, since each entry-with-overhead is just under half the CL size
-        CommitLog.instance.add(rm2);
-        CommitLog.instance.add(rm2);
-
-        assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
-
-
-        // "Flush" second cf: The first segment should be deleted since we
-        // didn't write anything on cf1 since last flush (and we flush cf2)
-
-        UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
-
-        // Assert we still have both our segment
-        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
-    }
-
-    private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
-    {
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName);
-        // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would
-        // break testEqualRecordLimit
-        int allocSize = 1;
-        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key)
-                      .clustering(colName)
-                      .add("val", ByteBuffer.allocate(allocSize)).build();
-
-        int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
-        max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
-
-        // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
-        int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
-        max -= mutationOverhead;
-
-        // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
-        int sizeOfMax = VIntCoding.computeVIntSize(max);
-        max -= sizeOfMax;
-        assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would
-        return max;
-    }
-
-    private static int getMaxRecordDataSize()
-    {
-        return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes");
-    }
-
-    // CASSANDRA-3615
-    @Test
-    public void testEqualRecordLimit() throws Exception
-    {
-        CommitLog.instance.resetUnsafe(true);
-        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
-                      .clustering("bytes")
-                      .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
-                      .build();
-        CommitLog.instance.add(rm);
-    }
-
-    @Test
-    public void testExceedRecordLimit() throws Exception
-    {
-        CommitLog.instance.resetUnsafe(true);
-        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-        try
-        {
-            Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
-                          .clustering("bytes")
-                          .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
-                          .build();
-            CommitLog.instance.add(rm);
-            throw new AssertionError("mutation larger than limit was accepted");
-        }
-        catch (IllegalArgumentException e)
-        {
-            // IAE is thrown on too-large mutations
-        }
-    }
-
-    @Test
-    public void testVersions()
-    {
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
-        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
-        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
-        assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
-        assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
-        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
-        assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
-    }
-
-    @Test
-    public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
-    {
-        boolean originalState = DatabaseDescriptor.isAutoSnapshot();
-        try
-        {
-            CommitLog.instance.resetUnsafe(true);
-            boolean prev = DatabaseDescriptor.isAutoSnapshot();
-            DatabaseDescriptor.setAutoSnapshot(false);
-            ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
-            ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
-
-            new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
-            cfs1.truncateBlocking();
-            DatabaseDescriptor.setAutoSnapshot(prev);
-            Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
-                          .clustering("bytes")
-                          .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
-                          .build();
-
-            for (int i = 0 ; i < 5 ; i++)
-                CommitLog.instance.add(m2);
-
-            assertEquals(2, CommitLog.instance.activeSegments());
-            ReplayPosition position = CommitLog.instance.getContext();
-            for (Keyspace ks : Keyspace.system())
-                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
-                    CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
-            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
-            assertEquals(1, CommitLog.instance.activeSegments());
-        }
-        finally
-        {
-            DatabaseDescriptor.setAutoSnapshot(originalState);
-        }
-    }
-
-    @Test
-    public void testTruncateWithoutSnapshotNonDurable() throws IOException
-    {
-        CommitLog.instance.resetUnsafe(true);
-        boolean originalState = DatabaseDescriptor.getAutoSnapshot();
-        try
-        {
-            DatabaseDescriptor.setAutoSnapshot(false);
-            Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
-            Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
-
-            ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
-            new RowUpdateBuilder(cfs.metadata, 0, "key1")
-                .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
-                .build()
-                .applyUnsafe();
-
-            assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
-                            .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
-
-            cfs.truncateBlocking();
-
-            Util.assertEmpty(Util.cmd(cfs).columns("val").build());
-        }
-        finally
-        {
-            DatabaseDescriptor.setAutoSnapshot(originalState);
-        }
-    }
-
-    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
-    {
-        ByteBuffer buf = ByteBuffer.allocate(1024);
-        CommitLogDescriptor.writeHeader(buf, desc);
-        long length = buf.position();
-        // Put some extra data in the stream.
-        buf.putDouble(0.1);
-        buf.flip();
-        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
-        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
-        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
-        Assert.assertEquals("Descriptors", desc, read);
-    }
-
-    @Test
-    public void testDescriptorPersistence() throws IOException
-    {
-        testDescriptorPersistence(new CommitLogDescriptor(11, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
-        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
-                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
-    }
-
-    @Test
-    public void testDescriptorInvalidParametersSize() throws IOException
-    {
-        Map<String, String> params = new HashMap<>();
-        for (int i=0; i<65535; ++i)
-            params.put("key"+i, Integer.toString(i, 16));
-        try {
-            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
-                                                               21,
-                                                               new ParameterizedClass("LZ4Compressor", params));
-            ByteBuffer buf = ByteBuffer.allocate(1024000);
-            CommitLogDescriptor.writeHeader(buf, desc);
-            Assert.fail("Parameter object too long should fail on writing descriptor.");
-        } catch (ConfigurationException e)
-        {
-            // correct path
-        }
-    }
-
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
-    {
-        Checksum checksum = new CRC32();
-        checksum.update(size);
-        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
-    }
-
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
-    {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        DataOutputStream dout = new DataOutputStream(out);
-        dout.writeInt(size);
-        dout.writeLong(checksum);
-        dout.write(new byte[dataSize]);
-        dout.close();
-        testRecovery(out.toByteArray());
-    }
-
-    protected File tmpFile() throws IOException
-    {
-        File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log");
-        logFile.deleteOnExit();
-        assert logFile.length() == 0;
-        return logFile;
-    }
-
-    protected void testRecovery(byte[] logData) throws Exception
-    {
-        File logFile = tmpFile();
-        try (OutputStream lout = new FileOutputStream(logFile))
-        {
-            lout.write(logData);
-            //statics make it annoying to test things correctly
-            CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
new file mode 100644
index 0000000..79f83fe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java
@@ -0,0 +1,141 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.cassandra.db.commitlog;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.CassandraDaemon;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+
+public class CommitLogFailurePolicyTest
+{
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
+    }
+
+    @Test
+    public void testCommitFailurePolicy_stop() throws ConfigurationException
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
+        // Need storage service active so stop policy can shutdown gossip
+        StorageService.instance.initServer();
+        Assert.assertTrue(Gossiper.instance.isEnabled());
+
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop);
+            CommitLog.handleCommitError("Test stop error", new Throwable());
+            Assert.assertFalse(Gossiper.instance.isEnabled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+        }
+    }
+
+    @Test
+    public void testCommitFailurePolicy_die()
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die);
+            CommitLog.handleCommitError("Testing die policy", new Throwable());
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+
+    @Test
+    public void testCommitFailurePolicy_ignore_beforeStartup()
+    {
+        //startup was not completed successfuly (since method completeSetup() was not called)
+        CassandraDaemon daemon = new CassandraDaemon();
+        StorageService.instance.registerDaemon(daemon);
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
+            //even though policy is ignore, JVM must die because Daemon has not finished initializing
+            Assert.assertTrue(killerForTests.wasKilled());
+            Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+
+    @Test
+    public void testCommitFailurePolicy_ignore_afterStartup() throws Exception
+    {
+        CassandraDaemon daemon = new CassandraDaemon();
+        daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy
+        StorageService.instance.registerDaemon(daemon);
+
+        KillerForTests killerForTests = new KillerForTests();
+        JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+        Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy();
+        try
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore);
+            CommitLog.handleCommitError("Testing ignore policy", new Throwable());
+            //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup
+            Assert.assertFalse(killerForTests.wasKilled());
+        }
+        finally
+        {
+            DatabaseDescriptor.setCommitFailurePolicy(oldPolicy);
+            JVMStabilityInspector.replaceKiller(originalKiller);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
new file mode 100644
index 0000000..b41b7b3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -0,0 +1,578 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.commitlog;
+
+import static junit.framework.Assert.assertTrue;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BytesType;
+
+import org.junit.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.Config.CommitFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.utils.vint.VIntCoding;
+
+public class CommitLogTest
+{
+    private static final String KEYSPACE1 = "CommitLogTest";
+    private static final String KEYSPACE2 = "CommitLogTestNonDurable";
+    private static final String STANDARD1 = "Standard1";
+    private static final String STANDARD2 = "Standard2";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+        SchemaLoader.createKeyspace(KEYSPACE2,
+                                    KeyspaceParams.simpleTransient(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD1, 0, AsciiType.instance, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, STANDARD2, 0, AsciiType.instance, BytesType.instance));
+        CompactionManager.instance.disableAutoCompaction();
+    }
+
+    @Test
+    public void testRecoveryWithEmptyLog() throws Exception
+    {
+        runExpecting(() -> {
+            CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) });
+            return null;
+        }, CommitLogReplayException.class);
+    }
+
+    @Test
+    public void testRecoveryWithEmptyLog20() throws Exception
+    {
+        CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) });
+    }
+
+    @Test
+    public void testRecoveryWithZeroLog() throws Exception
+    {
+        testRecovery(new byte[10], null);
+    }
+
+    @Test
+    public void testRecoveryWithShortLog() throws Exception
+    {
+        // force EOF while reading log
+        testRecoveryWithBadSizeArgument(100, 10);
+    }
+
+    @Test
+    public void testRecoveryWithShortSize() throws Exception
+    {
+        runExpecting(() -> {
+            testRecovery(new byte[2], CommitLogDescriptor.VERSION_20);
+            return null;
+        }, CommitLogReplayException.class);
+    }
+
+    @Test
+    public void testRecoveryWithShortCheckSum() throws Exception
+    {
+        byte[] data = new byte[8];
+        data[3] = 10;   // make sure this is not a legacy end marker.
+        testRecovery(data, CommitLogReplayException.class);
+    }
+
+    @Test
+    public void testRecoveryWithShortMutationSize() throws Exception
+    {
+        testRecoveryWithBadSizeArgument(9, 10);
+    }
+
+    private void testRecoveryWithGarbageLog() throws Exception
+    {
+        byte[] garbage = new byte[100];
+        (new java.util.Random()).nextBytes(garbage);
+        testRecovery(garbage, CommitLogDescriptor.current_version);
+    }
+
+    @Test
+    public void testRecoveryWithGarbageLog_fail() throws Exception
+    {
+        runExpecting(() -> {
+            testRecoveryWithGarbageLog();
+            return null;
+        }, CommitLogReplayException.class);
+    }
+
+    @Test
+    public void testRecoveryWithGarbageLog_ignoredByProperty() throws Exception
+    {
+        try {
+            System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+            testRecoveryWithGarbageLog();
+        } finally {
+            System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+        }
+    }
+
+    @Test
+    public void testRecoveryWithBadSizeChecksum() throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(100);
+        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+    }
+
+    @Test
+    public void testRecoveryWithNegativeSizeArgument() throws Exception
+    {
+        // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
+        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+    }
+
+    @Test
+    public void testDontDeleteIfDirty() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+        // Roughly 32 MB mutation
+        Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                     .clustering("bytes")
+                     .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
+                     .build();
+
+        // Adding it 5 times
+        CommitLog.instance.add(m);
+        CommitLog.instance.add(m);
+        CommitLog.instance.add(m);
+        CommitLog.instance.add(m);
+        CommitLog.instance.add(m);
+
+        // Adding new mutation on another CF
+        Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+                      .clustering("bytes")
+                      .add("val", ByteBuffer.allocate(4))
+                      .build();
+        CommitLog.instance.add(m2);
+
+        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+
+        UUID cfid2 = m2.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+
+        // Assert we still have both our segment
+        assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
+    }
+
+    @Test
+    public void testDeleteIfNotDirty() throws Exception
+    {
+        DatabaseDescriptor.getCommitLogSegmentSize();
+        CommitLog.instance.resetUnsafe(true);
+        ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+        // Roughly 32 MB mutation
+        Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+                      .clustering("bytes")
+                      .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
+                      .build();
+
+        // Adding it twice (won't change segment)
+        CommitLog.instance.add(rm);
+        CommitLog.instance.add(rm);
+
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+
+        // "Flush": this won't delete anything
+        UUID cfid1 = rm.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.sync(true);
+        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
+
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+
+        // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created
+        Mutation rm2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+                       .clustering("bytes")
+                       .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200))
+                       .build();
+        CommitLog.instance.add(rm2);
+        // also forces a new segment, since each entry-with-overhead is just under half the CL size
+        CommitLog.instance.add(rm2);
+        CommitLog.instance.add(rm2);
+
+        assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments();
+
+
+        // "Flush" second cf: The first segment should be deleted since we
+        // didn't write anything on cf1 since last flush (and we flush cf2)
+
+        UUID cfid2 = rm2.getColumnFamilyIds().iterator().next();
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+
+        // Assert we still have both our segment
+        assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
+    }
+
+    private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String cfName, String colName)
+    {
+        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(cfName);
+        // We don't want to allocate a size of 0 as this is optimized under the hood and our computation would
+        // break testEqualRecordLimit
+        int allocSize = 1;
+        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, key)
+                      .clustering(colName)
+                      .add("val", ByteBuffer.allocate(allocSize)).build();
+
+        int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
+        max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
+
+        // Note that the size of the value if vint encoded. So we first compute the ovehead of the mutation without the value and it's size
+        int mutationOverhead = (int)Mutation.serializer.serializedSize(rm, MessagingService.current_version) - (VIntCoding.computeVIntSize(allocSize) + allocSize);
+        max -= mutationOverhead;
+
+        // Now, max is the max for both the value and it's size. But we want to know how much we can allocate, i.e. the size of the value.
+        int sizeOfMax = VIntCoding.computeVIntSize(max);
+        max -= sizeOfMax;
+        assert VIntCoding.computeVIntSize(max) == sizeOfMax; // sanity check that we're still encoded with the size we though we would
+        return max;
+    }
+
+    private static int getMaxRecordDataSize()
+    {
+        return getMaxRecordDataSize(KEYSPACE1, bytes("k"), STANDARD1, "bytes");
+    }
+
+    // CASSANDRA-3615
+    @Test
+    public void testEqualRecordLimit() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+                      .clustering("bytes")
+                      .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
+                      .build();
+        CommitLog.instance.add(rm);
+    }
+
+    @Test
+    public void testExceedRecordLimit() throws Exception
+    {
+        CommitLog.instance.resetUnsafe(true);
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+        try
+        {
+            Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+                          .clustering("bytes")
+                          .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+                          .build();
+            CommitLog.instance.add(rm);
+            throw new AssertionError("mutation larger than limit was accepted");
+        }
+        catch (IllegalArgumentException e)
+        {
+            // IAE is thrown on too-large mutations
+        }
+    }
+
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(size);
+        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+    }
+
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
+    {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputStream dout = new DataOutputStream(out);
+        dout.writeInt(size);
+        dout.writeLong(checksum);
+        dout.write(new byte[dataSize]);
+        dout.close();
+        testRecovery(out.toByteArray(), CommitLogReplayException.class);
+    }
+
+    protected File tmpFile(int version) throws IOException
+    {
+        File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
+        logFile.deleteOnExit();
+        assert logFile.length() == 0;
+        return logFile;
+    }
+
+    protected Void testRecovery(byte[] logData, int version) throws Exception
+    {
+        File logFile = tmpFile(version);
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(logData);
+            //statics make it annoying to test things correctly
+            CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        }
+        return null;
+    }
+
+    protected Void testRecovery(CommitLogDescriptor desc, byte[] logData) throws Exception
+    {
+        File logFile = tmpFile(desc.version);
+        CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
+        // Change id to match file.
+        desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(buf.array(), 0, buf.position());
+            lout.write(logData);
+            //statics make it annoying to test things correctly
+            CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+        }
+        return null;
+    }
+
+    @Test
+    public void testRecoveryWithIdMismatch() throws Exception
+    {
+        CommitLogDescriptor desc = new CommitLogDescriptor(4, null);
+        File logFile = tmpFile(desc.version);
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(buf.array(), 0, buf.position());
+
+            runExpecting(() -> {
+                CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+                return null;
+            }, CommitLogReplayException.class);
+        }
+    }
+
+    @Test
+    public void testRecoveryWithBadCompressor() throws Exception
+    {
+        CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null));
+        runExpecting(() -> {
+            testRecovery(desc, new byte[0]);
+            return null;
+        }, CommitLogReplayException.class);
+    }
+
+    protected void runExpecting(Callable<Void> r, Class<?> expected)
+    {
+        JVMStabilityInspector.Killer originalKiller;
+        KillerForTests killerForTests;
+
+        killerForTests = new KillerForTests();
+        originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+
+        Throwable caught = null;
+        try
+        {
+            r.call();
+        }
+        catch (Throwable t)
+        {
+            if (expected != t.getClass())
+                throw new AssertionError("Expected exception " + expected + ", got " + t, t);
+            caught = t;
+        }
+        if (expected != null && caught == null)
+            Assert.fail("Expected exception " + expected + " but call completed successfully.");
+
+        JVMStabilityInspector.replaceKiller(originalKiller);
+        assertEquals("JVM killed", expected != null, killerForTests.wasKilled());
+    }
+
+    protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
+    {
+        runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
+        runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
+    }
+
+    @Test
+    public void testVersions()
+    {
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+        Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+        Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+        assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+        assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+        String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+        assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+    }
+
+    @Test
+    public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
+    {
+        boolean originalState = DatabaseDescriptor.isAutoSnapshot();
+        try
+        {
+            CommitLog.instance.resetUnsafe(true);
+            boolean prev = DatabaseDescriptor.isAutoSnapshot();
+            DatabaseDescriptor.setAutoSnapshot(false);
+            ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+            ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+            new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
+            cfs1.truncateBlocking();
+            DatabaseDescriptor.setAutoSnapshot(prev);
+            Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+                          .clustering("bytes")
+                          .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
+                          .build();
+
+            for (int i = 0 ; i < 5 ; i++)
+                CommitLog.instance.add(m2);
+
+            assertEquals(2, CommitLog.instance.activeSegments());
+            ReplayPosition position = CommitLog.instance.getContext();
+            for (Keyspace ks : Keyspace.system())
+                for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+                    CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+            CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
+            assertEquals(1, CommitLog.instance.activeSegments());
+        }
+        finally
+        {
+            DatabaseDescriptor.setAutoSnapshot(originalState);
+        }
+    }
+
+    @Test
+    public void testTruncateWithoutSnapshotNonDurable() throws IOException
+    {
+        CommitLog.instance.resetUnsafe(true);
+        boolean originalState = DatabaseDescriptor.getAutoSnapshot();
+        try
+        {
+            DatabaseDescriptor.setAutoSnapshot(false);
+            Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
+            Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
+
+            ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
+            new RowUpdateBuilder(cfs.metadata, 0, "key1")
+                .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
+                .build()
+                .applyUnsafe();
+
+            assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
+                            .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
+
+            cfs.truncateBlocking();
+
+            Util.assertEmpty(Util.cmd(cfs).columns("val").build());
+        }
+        finally
+        {
+            DatabaseDescriptor.setAutoSnapshot(originalState);
+        }
+    }
+
+    private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        CommitLogDescriptor.writeHeader(buf, desc);
+        long length = buf.position();
+        // Put some extra data in the stream.
+        buf.putDouble(0.1);
+        buf.flip();
+        FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
+        CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
+        Assert.assertEquals("Descriptor length", length, input.getFilePointer());
+        Assert.assertEquals("Descriptors", desc, read);
+    }
+
+    @Test
+    public void testDescriptorPersistence() throws IOException
+    {
+        testDescriptorPersistence(new CommitLogDescriptor(11, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
+        testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+                new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+    }
+
+    @Test
+    public void testDescriptorInvalidParametersSize() throws IOException
+    {
+        Map<String, String> params = new HashMap<>();
+        for (int i=0; i<65535; ++i)
+            params.put("key"+i, Integer.toString(i, 16));
+        try {
+            CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+                                                               21,
+                                                               new ParameterizedClass("LZ4Compressor", params));
+            ByteBuffer buf = ByteBuffer.allocate(1024000);
+            CommitLogDescriptor.writeHeader(buf, desc);
+            Assert.fail("Parameter object too long should fail on writing descriptor.");
+        } catch (ConfigurationException e)
+        {
+            // correct path
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d12d2d49/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
index 7b0ab06..00a143b 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java
@@ -30,6 +30,8 @@ import junit.framework.Assert;
 
 import com.google.common.base.Predicate;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -43,6 +45,9 @@ import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
 
 public class CommitLogUpgradeTest
 {
@@ -56,6 +61,24 @@ public class CommitLogUpgradeTest
     static final String KEYSPACE = "Keyspace1";
     static final String CELLNAME = "name";
 
+    private JVMStabilityInspector.Killer originalKiller;
+    private KillerForTests killerForTests;
+    private boolean shouldBeKilled = false;
+
+    @Before
+    public void prepareToBeKilled()
+    {
+        killerForTests = new KillerForTests();
+        originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
+    }
+
+    @After
+    public void cleanUp()
+    {
+        JVMStabilityInspector.replaceKiller(originalKiller);
+        Assert.assertEquals("JVM killed", shouldBeKilled, killerForTests.wasKilled());
+    }
+
     @Test
     public void test20() throws Exception
     {
@@ -69,6 +92,7 @@ public class CommitLogUpgradeTest
     }
 
     @Test
+
     public void test22() throws Exception
     {
         testRestore(DATA_DIR + "2.2");
@@ -86,6 +110,47 @@ public class CommitLogUpgradeTest
         testRestore(DATA_DIR + "2.2-snappy");
     }
 
+    public void test22_truncated() throws Exception
+    {
+        testRestore(DATA_DIR + "2.2-lz4-truncated");
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void test22_bitrot() throws Exception
+    {
+        shouldBeKilled = true;
+        testRestore(DATA_DIR + "2.2-lz4-bitrot");
+    }
+
+    @Test
+    public void test22_bitrot_ignored() throws Exception
+    {
+        try {
+            System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+            testRestore(DATA_DIR + "2.2-lz4-bitrot");
+        } finally {
+            System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+        }
+    }
+
+    @Test(expected = CommitLogReplayException.class)
+    public void test22_bitrot2() throws Exception
+    {
+        shouldBeKilled = true;
+        testRestore(DATA_DIR + "2.2-lz4-bitrot2");
+    }
+
+    @Test
+    public void test22_bitrot2_ignored() throws Exception
+    {
+        try {
+            System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true");
+            testRestore(DATA_DIR + "2.2-lz4-bitrot2");
+        } finally {
+            System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY);
+        }
+    }
+
     @BeforeClass
     static public void initialize() throws FileNotFoundException, IOException, InterruptedException
     {