You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/07/29 18:58:37 UTC

[2/3] git commit: Revert "Fail to start if commit log replay encounters an exception"

Revert "Fail to start if commit log replay encounters an exception"

This reverts commit 581ce631026b98ee9438d54ef144df89bc91100b.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a5bc52ee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a5bc52ee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a5bc52ee

Branch: refs/heads/trunk
Commit: a5bc52eee90e342efcdc53282612008d3dbaeaeb
Parents: db96239
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Jul 29 11:57:34 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Jul 29 11:57:34 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 -
 .../cassandra/db/commitlog/CommitLog.java       |  17 +--
 .../db/commitlog/CommitLogDescriptor.java       |   8 +-
 .../db/commitlog/CommitLogReplayer.java         |  76 +++--------
 .../commitlog/MalformedCommitLogException.java  |  16 ---
 .../cassandra/service/CassandraDaemon.java      |   2 -
 .../org/apache/cassandra/db/CommitLogTest.java  | 133 ++++---------------
 7 files changed, 48 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64f9793..1a2dc57 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,7 +1,6 @@
 2.1.1
  * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
  * Add listen_interface and rpc_interface options (CASSANDRA-7417)
- * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
  * Improve schema merge performance (CASSANDRA-7444)
  * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
  * Optimise NativeCell comparisons (CASSANDRA-6755)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a1be25d..d2a5fa7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -131,20 +131,9 @@ public class CommitLog implements CommitLogMBean
      */
     public int recover(File... clogs) throws IOException
     {
-        try
-        {
-            CommitLogReplayer recovery = new CommitLogReplayer();
-            recovery.recover(clogs);
-            return recovery.blockForWrites();
-        }
-        catch (IOException e)
-        {
-            if (e instanceof UnknownColumnFamilyException)
-                logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line");
-            if (e instanceof MalformedCommitLogException)
-                logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line");
-            throw e;
-        }
+        CommitLogReplayer recovery = new CommitLogReplayer();
+        recovery.recover(clogs);
+        return recovery.blockForWrites();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 77c25d3..91c81e1 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -28,8 +28,6 @@ import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -50,11 +48,10 @@ public class CommitLogDescriptor
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
-    @VisibleForTesting
     public static final int current_version = VERSION_21;
 
     // [version, id, checksum]
-    public static final int HEADER_SIZE = 4 + 8 + 4;
+    static final int HEADER_SIZE = 4 + 8 + 4;
 
     final int version;
     public final long id;
@@ -70,8 +67,7 @@ public class CommitLogDescriptor
         this(current_version, id);
     }
 
-    @VisibleForTesting
-    public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+    static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
     {
         out.putInt(0, descriptor.version);
         out.putLong(4, descriptor.id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 10d13b2..1012829 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
@@ -49,8 +48,6 @@ public class CommitLogReplayer
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
     private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
-    private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false");
-    private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false");
 
     private final Set<Keyspace> keyspacesRecovered;
     private final List<Future<?>> futures;
@@ -63,16 +60,16 @@ public class CommitLogReplayer
 
     public CommitLogReplayer()
     {
-        this.keyspacesRecovered = new NonBlockingHashSet<>();
-        this.futures = new ArrayList<>();
+        this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
+        this.futures = new ArrayList<Future<?>>();
         this.buffer = new byte[4096];
-        this.invalidMutations = new HashMap<>();
+        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
         this.checksum = new PureJavaCrc32();
 
         // compute per-CF and global replay positions
-        cfPositions = new HashMap<>();
+        cfPositions = new HashMap<UUID, ReplayPosition>();
         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
@@ -120,12 +117,7 @@ public class CommitLogReplayer
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
             if (offset != reader.length() && offset != Integer.MAX_VALUE)
-            {
-                String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath());
-                if (!IGNORE_ERRORS)
-                    throw new MalformedCommitLogException(message);
-                logger.warn(message);
-            }
+                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
             // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
             return -1;
         }
@@ -144,19 +136,13 @@ public class CommitLogReplayer
         {
             if (end != 0 || filecrc != 0)
             {
-                String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
-                if (!IGNORE_ERRORS)
-                    throw new MalformedCommitLogException(message);
-                logger.warn(message);
+                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
             }
             return -1;
         }
         else if (end < offset || end > reader.length())
         {
-            String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath());
-            if (!IGNORE_ERRORS)
-                throw new MalformedCommitLogException(message);
-            logger.warn(message);
+            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
             return -1;
         }
         return end;
@@ -285,9 +271,8 @@ public class CommitLogReplayer
                  /* read the logs populate Mutation and apply */
                 while (reader.getPosition() < end && !reader.isEOF())
                 {
-                    long mutationStart = reader.getFilePointer();
                     if (logger.isDebugEnabled())
-                        logger.debug("Reading mutation at {}", mutationStart);
+                        logger.debug("Reading mutation at {}", reader.getFilePointer());
 
                     long claimedCRC32;
                     int serializedSize;
@@ -297,7 +282,7 @@ public class CommitLogReplayer
                         serializedSize = reader.readInt();
                         if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
                         {
-                            logger.debug("Encountered end of segment marker at {}", mutationStart);
+                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
                             break main;
                         }
 
@@ -306,11 +291,7 @@ public class CommitLogReplayer
                         // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
                         // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                         if (serializedSize < 10)
-                        {
-                            if (!IGNORE_ERRORS)
-                                throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart);
                             break main;
-                        }
 
                         long claimedSizeChecksum;
                         if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -324,11 +305,7 @@ public class CommitLogReplayer
                             checksum.updateInt(serializedSize);
 
                         if (checksum.getValue() != claimedSizeChecksum)
-                        {
-                            if (!IGNORE_ERRORS)
-                                throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file);
                             break main; // entry wasn't synced correctly/fully. that's
-                        }
                         // ok.
 
                         if (serializedSize > buffer.length)
@@ -341,17 +318,12 @@ public class CommitLogReplayer
                     }
                     catch (EOFException eof)
                     {
-                        if (!IGNORE_ERRORS)
-                            throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof);
-
                         break main; // last CL entry didn't get completely written. that's ok.
                     }
 
                     checksum.update(buffer, 0, serializedSize);
                     if (claimedCRC32 != checksum.getValue())
                     {
-                        if (!IGNORE_ERRORS)
-                            throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file);
                         // this entry must not have been fsynced. probably the rest is bad too,
                         // but just in case there is no harm in trying them (since we still read on an entry boundary)
                         continue;
@@ -372,9 +344,6 @@ public class CommitLogReplayer
                     }
                     catch (UnknownColumnFamilyException ex)
                     {
-                        if (!IGNORE_MISSING_TABLES)
-                            throw ex;
-
                         if (ex.cfId == null)
                             continue;
                         AtomicInteger i = invalidMutations.get(ex.cfId);
@@ -389,14 +358,16 @@ public class CommitLogReplayer
                     }
                     catch (Throwable t)
                     {
-                        if (!IGNORE_ERRORS)
-                            throw new MalformedCommitLogException("Encountered bad mutation", t);
-
                         File f = File.createTempFile("mutation", "dat");
-                        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
+                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
+                        try
                         {
                             out.write(buffer, 0, serializedSize);
                         }
+                        finally
+                        {
+                            out.close();
+                        }
                         String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
                                                   f.getAbsolutePath());
                         logger.error(st, t);
@@ -412,11 +383,7 @@ public class CommitLogReplayer
                         public void runMayThrow() throws IOException
                         {
                             if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
-                            {
-                                if (!IGNORE_MISSING_TABLES)
-                                    throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next());
                                 return;
-                            }
                             if (pointInTimeExceeded(mutation))
                                 return;
 
@@ -431,12 +398,7 @@ public class CommitLogReplayer
                             for (ColumnFamily columnFamily : replayFilter.filter(mutation))
                             {
                                 if (Schema.instance.getCF(columnFamily.id()) == null)
-                                {
-                                    if (!IGNORE_MISSING_TABLES)
-                                        throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(),
-                                                                               mutation.getColumnFamilyIds().iterator().next());
                                     continue; // dropped
-                                }
 
                                 ReplayPosition rp = cfPositions.get(columnFamily.id());
 
@@ -453,7 +415,7 @@ public class CommitLogReplayer
                             if (newMutation != null)
                             {
                                 assert !newMutation.isEmpty();
-                                keyspace.apply(newMutation, false);
+                                Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
                                 keyspacesRecovered.add(keyspace);
                             }
                         }
@@ -491,10 +453,4 @@ public class CommitLogReplayer
         }
         return false;
     }
-
-    @VisibleForTesting
-    public static void setIgnoreErrors(boolean ignore)
-    {
-        IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
deleted file mode 100644
index 84a5cb0..0000000
--- a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package org.apache.cassandra.db.commitlog;
-
-import java.io.IOException;
-
-// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true)
-public class MalformedCommitLogException extends IOException
-{
-    public MalformedCommitLogException(String message)
-    {
-        super(message);
-    }
-    public MalformedCommitLogException(String message, Throwable cause)
-    {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 07c6cc4..fbee7ce 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -47,8 +47,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.UnknownColumnFamilyException;
-import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.ConfigurationException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a5bc52ee/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index dd05272..7046536 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -36,53 +36,46 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
-import org.apache.cassandra.db.commitlog.CommitLogReplayer;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class CommitLogTest extends SchemaLoader
 {
-
-    static
-    {
-        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
-    }
-
     @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
-        testMalformed(badLogFile(new byte[0]));
+        CommitLog.instance.recover(new File[]{ tmpFile() });
     }
 
     @Test
     public void testRecoveryWithShortLog() throws Exception
     {
         // force EOF while reading log
-        testMalformed(badLogFile(100, 10));
+        testRecoveryWithBadSizeArgument(100, 10);
     }
 
     @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        testMalformed(new byte[2]);
+        testRecovery(new byte[2]);
     }
 
     @Test
     public void testRecoveryWithShortCheckSum() throws Exception
     {
-        testMalformed(new byte[6]);
+        testRecovery(new byte[6]);
     }
 
     @Test
     public void testRecoveryWithGarbageLog() throws Exception
     {
-        testMalformed(garbage(100));
+        byte[] garbage = new byte[100];
+        (new java.util.Random()).nextBytes(garbage);
+        testRecovery(garbage);
     }
 
     @Test
@@ -90,30 +83,21 @@ public class CommitLogTest extends SchemaLoader
     {
         Checksum checksum = new CRC32();
         checksum.update(100);
-        testMalformed(badLogFile(100, checksum.getValue(), new byte[100]));
-        testMalformed(badLogFile(100, checksum.getValue(), garbage(100)));
-    }
-
-    @Test
-    public void testRecoveryWithBadSize() throws Exception
-    {
-        Checksum checksum = new CRC32();
-        checksum.update(100);
-        testMalformed(badLogFile(120, checksum.getValue(), garbage(100)));
+        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
     }
 
     @Test
     public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
     {
         // many different combinations of 4 bytes (garbage) will be read as zero by readInt()
-        testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF
+        testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
     }
 
     @Test
     public void testRecoveryWithNegativeSizeArgument() throws Exception
     {
         // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
-        testMalformed(badLogFile(-10, 10)); // zero size, but no EOF
+        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
     }
 
     @Test
@@ -190,8 +174,8 @@ public class CommitLogTest extends SchemaLoader
 
     private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
     {
-        Mutation rm = new Mutation(keyspace, key);
-        rm.add(table, column, ByteBuffer.allocate(0), 0);
+        Mutation rm = new Mutation("Keyspace1", bytes("k"));
+        rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
 
         int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
         max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
@@ -231,73 +215,22 @@ public class CommitLogTest extends SchemaLoader
         }
     }
 
-    // construct log file with correct chunk checksum for the provided size/position
-    protected File badLogFile(int markerSize, int realSize) throws Exception
-    {
-        return badLogFile(markerSize, garbage(realSize));
-    }
-
-    protected File badLogFile(int markerSize, byte[] data) throws Exception
-    {
-        File logFile = tmpFile();
-        CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName());
-        PureJavaCrc32 crc = new PureJavaCrc32();
-        crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
-        crc.updateInt((int) (descriptor.id >>> 32));
-        crc.updateInt(CommitLogDescriptor.HEADER_SIZE);
-        return badLogFile(markerSize, crc.getCrc(), data, logFile);
-    }
-
-    protected byte[] garbage(int size)
-    {
-        byte[] garbage = new byte[size];
-        (new java.util.Random()).nextBytes(garbage);
-        return garbage;
-    }
-
-    protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception
-    {
-        return badLogFile(markerSize, checksum, realSize, tmpFile());
-    }
-
-    protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
     {
-        return badLogFile(markerSize, checksum, new byte[realSize], logFile);
-    }
-
-    protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception
-    {
-        return badLogFile(markerSize, checksum, chunk, tmpFile());
+        Checksum checksum = new CRC32();
+        checksum.update(size);
+        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
     }
 
-    protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception
+    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
     {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(out);
-        ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE);
-        CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName()));
-        out.write(buffer.array());
-        dout.writeInt(markerSize);
+        dout.writeInt(size);
         dout.writeLong(checksum);
-        dout.write(chunk);
+        dout.write(new byte[dataSize]);
         dout.close();
-        try (OutputStream lout = new FileOutputStream(logFile))
-        {
-            lout.write(out.toByteArray());
-            lout.close();
-        }
-        return logFile;
-    }
-
-    protected File badLogFile(byte[] contents) throws Exception
-    {
-        File logFile = tmpFile();
-        try (OutputStream lout = new FileOutputStream(logFile))
-        {
-            lout.write(contents);
-            lout.close();
-        }
-        return logFile;
+        testRecovery(out.toByteArray());
     }
 
     protected File tmpFile() throws IOException
@@ -308,29 +241,17 @@ public class CommitLogTest extends SchemaLoader
         return logFile;
     }
 
-    private void testMalformed(byte[] contents) throws Exception
-    {
-        testMalformed(badLogFile(contents));
-        testMalformed(badLogFile(contents.length, contents));
-    }
-
-    private void testMalformed(File logFile) throws Exception
+    protected void testRecovery(byte[] logData) throws Exception
     {
-        CommitLogReplayer.setIgnoreErrors(true);
-        CommitLog.instance.recover(new File[]{ logFile });
-        CommitLogReplayer.setIgnoreErrors(false);
-        try
-        {
-            CommitLog.instance.recover(new File[]{ logFile });
-            Assert.assertFalse(true);
-        }
-        catch (Throwable t)
+        File logFile = tmpFile();
+        try (OutputStream lout = new FileOutputStream(logFile))
         {
-            if (!(t instanceof MalformedCommitLogException))
-                throw t;
+            lout.write(logData);
+            //statics make it annoying to test things correctly
+            CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
         }
     }
-
+    
     @Test
     public void testVersions()
     {