You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/07/10 11:01:45 UTC

[1/3] git commit: Fail to start if commit log replay encounters an exception

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 109e600fa -> 581ce6310
  refs/heads/trunk ae03e1bab -> ac4a0263f


Fail to start if commit log replay encounters an exception

patch by Benedict; reviewed by Vijay


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/581ce631
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/581ce631
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/581ce631

Branch: refs/heads/cassandra-2.1
Commit: 581ce631026b98ee9438d54ef144df89bc91100b
Parents: 109e600
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 10 09:55:00 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Jul 10 09:58:52 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |  17 ++-
 .../db/commitlog/CommitLogDescriptor.java       |   8 +-
 .../db/commitlog/CommitLogReplayer.java         |  76 ++++++++---
 .../commitlog/MalformedCommitLogException.java  |  16 +++
 .../cassandra/service/CassandraDaemon.java      |   2 +
 .../org/apache/cassandra/db/CommitLogTest.java  | 133 +++++++++++++++----
 7 files changed, 205 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae06d92..02a2d52 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
  * Improve schema merge performance (CASSANDRA-7444)
  * Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454)
  * Adjust MT depth based on # of partition validating (CASSANDRA-5263)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index cf8a7f6..ac1d811 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -133,9 +133,20 @@ public class CommitLog implements CommitLogMBean
      */
     public int recover(File... clogs) throws IOException
     {
-        CommitLogReplayer recovery = new CommitLogReplayer();
-        recovery.recover(clogs);
-        return recovery.blockForWrites();
+        try
+        {
+            CommitLogReplayer recovery = new CommitLogReplayer();
+            recovery.recover(clogs);
+            return recovery.blockForWrites();
+        }
+        catch (IOException e)
+        {
+            if (e instanceof UnknownColumnFamilyException)
+                logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line");
+            if (e instanceof MalformedCommitLogException)
+                logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line");
+            throw e;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 91c81e1..77c25d3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -28,6 +28,8 @@ import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -48,10 +50,11 @@ public class CommitLogDescriptor
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
+    @VisibleForTesting
     public static final int current_version = VERSION_21;
 
     // [version, id, checksum]
-    static final int HEADER_SIZE = 4 + 8 + 4;
+    public static final int HEADER_SIZE = 4 + 8 + 4;
 
     final int version;
     public final long id;
@@ -67,7 +70,8 @@ public class CommitLogDescriptor
         this(current_version, id);
     }
 
-    static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+    @VisibleForTesting
+    public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
     {
         out.putInt(0, descriptor.version);
         out.putLong(4, descriptor.id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 1012829..10d13b2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
@@ -48,6 +49,8 @@ public class CommitLogReplayer
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
     private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
+    private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false");
+    private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false");
 
     private final Set<Keyspace> keyspacesRecovered;
     private final List<Future<?>> futures;
@@ -60,16 +63,16 @@ public class CommitLogReplayer
 
     public CommitLogReplayer()
     {
-        this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
-        this.futures = new ArrayList<Future<?>>();
+        this.keyspacesRecovered = new NonBlockingHashSet<>();
+        this.futures = new ArrayList<>();
         this.buffer = new byte[4096];
-        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
+        this.invalidMutations = new HashMap<>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
         this.checksum = new PureJavaCrc32();
 
         // compute per-CF and global replay positions
-        cfPositions = new HashMap<UUID, ReplayPosition>();
+        cfPositions = new HashMap<>();
         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
@@ -117,7 +120,12 @@ public class CommitLogReplayer
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
             if (offset != reader.length() && offset != Integer.MAX_VALUE)
-                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
+            {
+                String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath());
+                if (!IGNORE_ERRORS)
+                    throw new MalformedCommitLogException(message);
+                logger.warn(message);
+            }
             // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
             return -1;
         }
@@ -136,13 +144,19 @@ public class CommitLogReplayer
         {
             if (end != 0 || filecrc != 0)
             {
-                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+                String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+                if (!IGNORE_ERRORS)
+                    throw new MalformedCommitLogException(message);
+                logger.warn(message);
             }
             return -1;
         }
         else if (end < offset || end > reader.length())
         {
-            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+            String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath());
+            if (!IGNORE_ERRORS)
+                throw new MalformedCommitLogException(message);
+            logger.warn(message);
             return -1;
         }
         return end;
@@ -271,8 +285,9 @@ public class CommitLogReplayer
                  /* read the logs populate Mutation and apply */
                 while (reader.getPosition() < end && !reader.isEOF())
                 {
+                    long mutationStart = reader.getFilePointer();
                     if (logger.isDebugEnabled())
-                        logger.debug("Reading mutation at {}", reader.getFilePointer());
+                        logger.debug("Reading mutation at {}", mutationStart);
 
                     long claimedCRC32;
                     int serializedSize;
@@ -282,7 +297,7 @@ public class CommitLogReplayer
                         serializedSize = reader.readInt();
                         if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
                         {
-                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+                            logger.debug("Encountered end of segment marker at {}", mutationStart);
                             break main;
                         }
 
@@ -291,7 +306,11 @@ public class CommitLogReplayer
                         // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
                         // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                         if (serializedSize < 10)
+                        {
+                            if (!IGNORE_ERRORS)
+                                throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart);
                             break main;
+                        }
 
                         long claimedSizeChecksum;
                         if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -305,7 +324,11 @@ public class CommitLogReplayer
                             checksum.updateInt(serializedSize);
 
                         if (checksum.getValue() != claimedSizeChecksum)
+                        {
+                            if (!IGNORE_ERRORS)
+                                throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file);
                             break main; // entry wasn't synced correctly/fully. that's
+                        }
                         // ok.
 
                         if (serializedSize > buffer.length)
@@ -318,12 +341,17 @@ public class CommitLogReplayer
                     }
                     catch (EOFException eof)
                     {
+                        if (!IGNORE_ERRORS)
+                            throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof);
+
                         break main; // last CL entry didn't get completely written. that's ok.
                     }
 
                     checksum.update(buffer, 0, serializedSize);
                     if (claimedCRC32 != checksum.getValue())
                     {
+                        if (!IGNORE_ERRORS)
+                            throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file);
                         // this entry must not have been fsynced. probably the rest is bad too,
                         // but just in case there is no harm in trying them (since we still read on an entry boundary)
                         continue;
@@ -344,6 +372,9 @@ public class CommitLogReplayer
                     }
                     catch (UnknownColumnFamilyException ex)
                     {
+                        if (!IGNORE_MISSING_TABLES)
+                            throw ex;
+
                         if (ex.cfId == null)
                             continue;
                         AtomicInteger i = invalidMutations.get(ex.cfId);
@@ -358,16 +389,14 @@ public class CommitLogReplayer
                     }
                     catch (Throwable t)
                     {
+                        if (!IGNORE_ERRORS)
+                            throw new MalformedCommitLogException("Encountered bad mutation", t);
+
                         File f = File.createTempFile("mutation", "dat");
-                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
-                        try
+                        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
                         {
                             out.write(buffer, 0, serializedSize);
                         }
-                        finally
-                        {
-                            out.close();
-                        }
                         String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
                                                   f.getAbsolutePath());
                         logger.error(st, t);
@@ -383,7 +412,11 @@ public class CommitLogReplayer
                         public void runMayThrow() throws IOException
                         {
                             if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+                            {
+                                if (!IGNORE_MISSING_TABLES)
+                                    throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next());
                                 return;
+                            }
                             if (pointInTimeExceeded(mutation))
                                 return;
 
@@ -398,7 +431,12 @@ public class CommitLogReplayer
                             for (ColumnFamily columnFamily : replayFilter.filter(mutation))
                             {
                                 if (Schema.instance.getCF(columnFamily.id()) == null)
+                                {
+                                    if (!IGNORE_MISSING_TABLES)
+                                        throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(),
+                                                                               mutation.getColumnFamilyIds().iterator().next());
                                     continue; // dropped
+                                }
 
                                 ReplayPosition rp = cfPositions.get(columnFamily.id());
 
@@ -415,7 +453,7 @@ public class CommitLogReplayer
                             if (newMutation != null)
                             {
                                 assert !newMutation.isEmpty();
-                                Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+                                keyspace.apply(newMutation, false);
                                 keyspacesRecovered.add(keyspace);
                             }
                         }
@@ -453,4 +491,10 @@ public class CommitLogReplayer
         }
         return false;
     }
+
+    @VisibleForTesting
+    public static void setIgnoreErrors(boolean ignore)
+    {
+        IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
new file mode 100644
index 0000000..84a5cb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+
+// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true)
+public class MalformedCommitLogException extends IOException
+{
+    public MalformedCommitLogException(String message)
+    {
+        super(message);
+    }
+    public MalformedCommitLogException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index fbee7ce..07c6cc4 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.ConfigurationException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 7046536..dd05272 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -36,46 +36,53 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class CommitLogTest extends SchemaLoader
 {
+
+    static
+    {
+        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
+    }
+
     @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
-        CommitLog.instance.recover(new File[]{ tmpFile() });
+        testMalformed(badLogFile(new byte[0]));
     }
 
     @Test
     public void testRecoveryWithShortLog() throws Exception
     {
         // force EOF while reading log
-        testRecoveryWithBadSizeArgument(100, 10);
+        testMalformed(badLogFile(100, 10));
     }
 
     @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        testRecovery(new byte[2]);
+        testMalformed(new byte[2]);
     }
 
     @Test
     public void testRecoveryWithShortCheckSum() throws Exception
     {
-        testRecovery(new byte[6]);
+        testMalformed(new byte[6]);
     }
 
     @Test
     public void testRecoveryWithGarbageLog() throws Exception
     {
-        byte[] garbage = new byte[100];
-        (new java.util.Random()).nextBytes(garbage);
-        testRecovery(garbage);
+        testMalformed(garbage(100));
     }
 
     @Test
@@ -83,21 +90,30 @@ public class CommitLogTest extends SchemaLoader
     {
         Checksum checksum = new CRC32();
         checksum.update(100);
-        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+        testMalformed(badLogFile(100, checksum.getValue(), new byte[100]));
+        testMalformed(badLogFile(100, checksum.getValue(), garbage(100)));
+    }
+
+    @Test
+    public void testRecoveryWithBadSize() throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(100);
+        testMalformed(badLogFile(120, checksum.getValue(), garbage(100)));
     }
 
     @Test
     public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
     {
         // many different combinations of 4 bytes (garbage) will be read as zero by readInt()
-        testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
+        testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF
     }
 
     @Test
     public void testRecoveryWithNegativeSizeArgument() throws Exception
     {
         // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
-        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+        testMalformed(badLogFile(-10, 10)); // zero size, but no EOF
     }
 
     @Test
@@ -174,8 +190,8 @@ public class CommitLogTest extends SchemaLoader
 
     private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
     {
-        Mutation rm = new Mutation("Keyspace1", bytes("k"));
-        rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
+        Mutation rm = new Mutation(keyspace, key);
+        rm.add(table, column, ByteBuffer.allocate(0), 0);
 
         int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
         max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
@@ -215,22 +231,73 @@ public class CommitLogTest extends SchemaLoader
         }
     }
 
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
+    // construct log file with correct chunk checksum for the provided size/position
+    protected File badLogFile(int markerSize, int realSize) throws Exception
     {
-        Checksum checksum = new CRC32();
-        checksum.update(size);
-        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+        return badLogFile(markerSize, garbage(realSize));
     }
 
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
+    protected File badLogFile(int markerSize, byte[] data) throws Exception
+    {
+        File logFile = tmpFile();
+        CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName());
+        PureJavaCrc32 crc = new PureJavaCrc32();
+        crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+        crc.updateInt((int) (descriptor.id >>> 32));
+        crc.updateInt(CommitLogDescriptor.HEADER_SIZE);
+        return badLogFile(markerSize, crc.getCrc(), data, logFile);
+    }
+
+    protected byte[] garbage(int size)
+    {
+        byte[] garbage = new byte[size];
+        (new java.util.Random()).nextBytes(garbage);
+        return garbage;
+    }
+
+    protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception
+    {
+        return badLogFile(markerSize, checksum, realSize, tmpFile());
+    }
+
+    protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception
+    {
+        return badLogFile(markerSize, checksum, new byte[realSize], logFile);
+    }
+
+    protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception
+    {
+        return badLogFile(markerSize, checksum, chunk, tmpFile());
+    }
+
+    protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception
     {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(out);
-        dout.writeInt(size);
+        ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE);
+        CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName()));
+        out.write(buffer.array());
+        dout.writeInt(markerSize);
         dout.writeLong(checksum);
-        dout.write(new byte[dataSize]);
+        dout.write(chunk);
         dout.close();
-        testRecovery(out.toByteArray());
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(out.toByteArray());
+            lout.close();
+        }
+        return logFile;
+    }
+
+    protected File badLogFile(byte[] contents) throws Exception
+    {
+        File logFile = tmpFile();
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(contents);
+            lout.close();
+        }
+        return logFile;
     }
 
     protected File tmpFile() throws IOException
@@ -241,17 +308,29 @@ public class CommitLogTest extends SchemaLoader
         return logFile;
     }
 
-    protected void testRecovery(byte[] logData) throws Exception
+    private void testMalformed(byte[] contents) throws Exception
     {
-        File logFile = tmpFile();
-        try (OutputStream lout = new FileOutputStream(logFile))
+        testMalformed(badLogFile(contents));
+        testMalformed(badLogFile(contents.length, contents));
+    }
+
+    private void testMalformed(File logFile) throws Exception
+    {
+        CommitLogReplayer.setIgnoreErrors(true);
+        CommitLog.instance.recover(new File[]{ logFile });
+        CommitLogReplayer.setIgnoreErrors(false);
+        try
         {
-            lout.write(logData);
-            //statics make it annoying to test things correctly
-            CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+            CommitLog.instance.recover(new File[]{ logFile });
+            Assert.assertFalse(true);
+        }
+        catch (Throwable t)
+        {
+            if (!(t instanceof MalformedCommitLogException))
+                throw t;
         }
     }
-    
+
     @Test
     public void testVersions()
     {


[2/3] git commit: Fail to start if commit log replay encounters an exception

Posted by be...@apache.org.
Fail to start if commit log replay encounters an exception

patch by Benedict; reviewed by Vijay


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/581ce631
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/581ce631
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/581ce631

Branch: refs/heads/trunk
Commit: 581ce631026b98ee9438d54ef144df89bc91100b
Parents: 109e600
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 10 09:55:00 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Jul 10 09:58:52 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |  17 ++-
 .../db/commitlog/CommitLogDescriptor.java       |   8 +-
 .../db/commitlog/CommitLogReplayer.java         |  76 ++++++++---
 .../commitlog/MalformedCommitLogException.java  |  16 +++
 .../cassandra/service/CassandraDaemon.java      |   2 +
 .../org/apache/cassandra/db/CommitLogTest.java  | 133 +++++++++++++++----
 7 files changed, 205 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae06d92..02a2d52 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.1
+ * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
  * Improve schema merge performance (CASSANDRA-7444)
  * Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454)
  * Adjust MT depth based on # of partition validating (CASSANDRA-5263)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index cf8a7f6..ac1d811 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -133,9 +133,20 @@ public class CommitLog implements CommitLogMBean
      */
     public int recover(File... clogs) throws IOException
     {
-        CommitLogReplayer recovery = new CommitLogReplayer();
-        recovery.recover(clogs);
-        return recovery.blockForWrites();
+        try
+        {
+            CommitLogReplayer recovery = new CommitLogReplayer();
+            recovery.recover(clogs);
+            return recovery.blockForWrites();
+        }
+        catch (IOException e)
+        {
+            if (e instanceof UnknownColumnFamilyException)
+                logger.error("Commit log replay failed due to replaying a mutation for a missing table. This error can be ignored by providing -Dcassandra.commitlog.stop_on_missing_tables=false on the command line");
+            if (e instanceof MalformedCommitLogException)
+                logger.error("Commit log replay failed due to a non-fatal exception. This error can be ignored by providing -Dcassandra.commitlog.stop_on_errors=false on the command line");
+            throw e;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index 91c81e1..77c25d3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@ -28,6 +28,8 @@ import java.nio.ByteBuffer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -48,10 +50,11 @@ public class CommitLogDescriptor
      * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes.
      * Note: make sure to handle {@link #getMessagingVersion()}
      */
+    @VisibleForTesting
     public static final int current_version = VERSION_21;
 
     // [version, id, checksum]
-    static final int HEADER_SIZE = 4 + 8 + 4;
+    public static final int HEADER_SIZE = 4 + 8 + 4;
 
     final int version;
     public final long id;
@@ -67,7 +70,8 @@ public class CommitLogDescriptor
         this(current_version, id);
     }
 
-    static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
+    @VisibleForTesting
+    public static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor)
     {
         out.putInt(0, descriptor.version);
         out.putLong(4, descriptor.id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 1012829..10d13b2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
@@ -48,6 +49,8 @@ public class CommitLogReplayer
     private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class);
     private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
     private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
+    private static boolean IGNORE_ERRORS = System.getProperty("cassandra.commitlog.stop_on_errors", "true").equals("false");
+    private static boolean IGNORE_MISSING_TABLES = IGNORE_ERRORS || System.getProperty("cassandra.commitlog.stop_on_missing_tables", "true").equals("false");
 
     private final Set<Keyspace> keyspacesRecovered;
     private final List<Future<?>> futures;
@@ -60,16 +63,16 @@ public class CommitLogReplayer
 
     public CommitLogReplayer()
     {
-        this.keyspacesRecovered = new NonBlockingHashSet<Keyspace>();
-        this.futures = new ArrayList<Future<?>>();
+        this.keyspacesRecovered = new NonBlockingHashSet<>();
+        this.futures = new ArrayList<>();
         this.buffer = new byte[4096];
-        this.invalidMutations = new HashMap<UUID, AtomicInteger>();
+        this.invalidMutations = new HashMap<>();
         // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference.
         this.replayedCount = new AtomicInteger();
         this.checksum = new PureJavaCrc32();
 
         // compute per-CF and global replay positions
-        cfPositions = new HashMap<UUID, ReplayPosition>();
+        cfPositions = new HashMap<>();
         Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator);
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
@@ -117,7 +120,12 @@ public class CommitLogReplayer
         if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE)
         {
             if (offset != reader.length() && offset != Integer.MAX_VALUE)
-                logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath());
+            {
+                String message = String.format("Encountered bad header at position %d of Commit log %s; not enough room for a header", offset, reader.getPath());
+                if (!IGNORE_ERRORS)
+                    throw new MalformedCommitLogException(message);
+                logger.warn(message);
+            }
             // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment
             return -1;
         }
@@ -136,13 +144,19 @@ public class CommitLogReplayer
         {
             if (end != 0 || filecrc != 0)
             {
-                logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+                String message = String.format("Encountered bad header at position %d of Commit log %s, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath());
+                if (!IGNORE_ERRORS)
+                    throw new MalformedCommitLogException(message);
+                logger.warn(message);
             }
             return -1;
         }
         else if (end < offset || end > reader.length())
         {
-            logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath());
+            String message = String.format("Encountered bad header at position %d of Commit log %s, with bad position but valid CRC.", offset, reader.getPath());
+            if (!IGNORE_ERRORS)
+                throw new MalformedCommitLogException(message);
+            logger.warn(message);
             return -1;
         }
         return end;
@@ -271,8 +285,9 @@ public class CommitLogReplayer
                  /* read the logs populate Mutation and apply */
                 while (reader.getPosition() < end && !reader.isEOF())
                 {
+                    long mutationStart = reader.getFilePointer();
                     if (logger.isDebugEnabled())
-                        logger.debug("Reading mutation at {}", reader.getFilePointer());
+                        logger.debug("Reading mutation at {}", mutationStart);
 
                     long claimedCRC32;
                     int serializedSize;
@@ -282,7 +297,7 @@ public class CommitLogReplayer
                         serializedSize = reader.readInt();
                         if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
                         {
-                            logger.debug("Encountered end of segment marker at {}", reader.getFilePointer());
+                            logger.debug("Encountered end of segment marker at {}", mutationStart);
                             break main;
                         }
 
@@ -291,7 +306,11 @@ public class CommitLogReplayer
                         // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
                         // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                         if (serializedSize < 10)
+                        {
+                            if (!IGNORE_ERRORS)
+                                throw new MalformedCommitLogException("Too small mutation encountered at position " + mutationStart);
                             break main;
+                        }
 
                         long claimedSizeChecksum;
                         if (desc.version < CommitLogDescriptor.VERSION_21)
@@ -305,7 +324,11 @@ public class CommitLogReplayer
                             checksum.updateInt(serializedSize);
 
                         if (checksum.getValue() != claimedSizeChecksum)
+                        {
+                            if (!IGNORE_ERRORS)
+                                throw new IOException("Invalid size checksum for mutation at position " + mutationStart + " of " + file);
                             break main; // entry wasn't synced correctly/fully. that's
+                        }
                         // ok.
 
                         if (serializedSize > buffer.length)
@@ -318,12 +341,17 @@ public class CommitLogReplayer
                     }
                     catch (EOFException eof)
                     {
+                        if (!IGNORE_ERRORS)
+                            throw new MalformedCommitLogException("Encountered end-of-file unexpectedly", eof);
+
                         break main; // last CL entry didn't get completely written. that's ok.
                     }
 
                     checksum.update(buffer, 0, serializedSize);
                     if (claimedCRC32 != checksum.getValue())
                     {
+                        if (!IGNORE_ERRORS)
+                            throw new IOException("Invalid checksum for mutation at position " + mutationStart + " of " + file);
                         // this entry must not have been fsynced. probably the rest is bad too,
                         // but just in case there is no harm in trying them (since we still read on an entry boundary)
                         continue;
@@ -344,6 +372,9 @@ public class CommitLogReplayer
                     }
                     catch (UnknownColumnFamilyException ex)
                     {
+                        if (!IGNORE_MISSING_TABLES)
+                            throw ex;
+
                         if (ex.cfId == null)
                             continue;
                         AtomicInteger i = invalidMutations.get(ex.cfId);
@@ -358,16 +389,14 @@ public class CommitLogReplayer
                     }
                     catch (Throwable t)
                     {
+                        if (!IGNORE_ERRORS)
+                            throw new MalformedCommitLogException("Encountered bad mutation", t);
+
                         File f = File.createTempFile("mutation", "dat");
-                        DataOutputStream out = new DataOutputStream(new FileOutputStream(f));
-                        try
+                        try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
                         {
                             out.write(buffer, 0, serializedSize);
                         }
-                        finally
-                        {
-                            out.close();
-                        }
                         String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored.  This may be caused by replaying a mutation against a table with the same name but incompatible schema.  Exception follows: ",
                                                   f.getAbsolutePath());
                         logger.error(st, t);
@@ -383,7 +412,11 @@ public class CommitLogReplayer
                         public void runMayThrow() throws IOException
                         {
                             if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
+                            {
+                                if (!IGNORE_MISSING_TABLES)
+                                    throw new UnknownColumnFamilyException("Keyspace for this table is missing", mutation.getColumnFamilyIds().iterator().next());
                                 return;
+                            }
                             if (pointInTimeExceeded(mutation))
                                 return;
 
@@ -398,7 +431,12 @@ public class CommitLogReplayer
                             for (ColumnFamily columnFamily : replayFilter.filter(mutation))
                             {
                                 if (Schema.instance.getCF(columnFamily.id()) == null)
+                                {
+                                    if (!IGNORE_MISSING_TABLES)
+                                        throw new UnknownColumnFamilyException("Missing table with cfid=" + columnFamily.id(),
+                                                                               mutation.getColumnFamilyIds().iterator().next());
                                     continue; // dropped
+                                }
 
                                 ReplayPosition rp = cfPositions.get(columnFamily.id());
 
@@ -415,7 +453,7 @@ public class CommitLogReplayer
                             if (newMutation != null)
                             {
                                 assert !newMutation.isEmpty();
-                                Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
+                                keyspace.apply(newMutation, false);
                                 keyspacesRecovered.add(keyspace);
                             }
                         }
@@ -453,4 +491,10 @@ public class CommitLogReplayer
         }
         return false;
     }
+
+    @VisibleForTesting
+    public static void setIgnoreErrors(boolean ignore)
+    {
+        IGNORE_ERRORS = IGNORE_MISSING_TABLES = ignore;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
new file mode 100644
index 0000000..84a5cb0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/commitlog/MalformedCommitLogException.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+
+// represents a non-fatal commit log replay exception (i.e. can be skipped with -Dcassandra.commitlog.ignoreerrors=true)
+public class MalformedCommitLogException extends IOException
+{
+    public MalformedCommitLogException(String message)
+    {
+        super(message);
+    }
+    public MalformedCommitLogException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index fbee7ce..07c6cc4 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.ConfigurationException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/581ce631/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index 7046536..dd05272 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -36,46 +36,53 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
 import org.apache.cassandra.db.commitlog.CommitLogSegment;
+import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.PureJavaCrc32;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class CommitLogTest extends SchemaLoader
 {
+
+    static
+    {
+        System.setProperty("cassandra.commitlog.stop_on_errors", "true");
+    }
+
     @Test
     public void testRecoveryWithEmptyLog() throws Exception
     {
-        CommitLog.instance.recover(new File[]{ tmpFile() });
+        testMalformed(badLogFile(new byte[0]));
     }
 
     @Test
     public void testRecoveryWithShortLog() throws Exception
     {
         // force EOF while reading log
-        testRecoveryWithBadSizeArgument(100, 10);
+        testMalformed(badLogFile(100, 10));
     }
 
     @Test
     public void testRecoveryWithShortSize() throws Exception
     {
-        testRecovery(new byte[2]);
+        testMalformed(new byte[2]);
     }
 
     @Test
     public void testRecoveryWithShortCheckSum() throws Exception
     {
-        testRecovery(new byte[6]);
+        testMalformed(new byte[6]);
     }
 
     @Test
     public void testRecoveryWithGarbageLog() throws Exception
     {
-        byte[] garbage = new byte[100];
-        (new java.util.Random()).nextBytes(garbage);
-        testRecovery(garbage);
+        testMalformed(garbage(100));
     }
 
     @Test
@@ -83,21 +90,30 @@ public class CommitLogTest extends SchemaLoader
     {
         Checksum checksum = new CRC32();
         checksum.update(100);
-        testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue());
+        testMalformed(badLogFile(100, checksum.getValue(), new byte[100]));
+        testMalformed(badLogFile(100, checksum.getValue(), garbage(100)));
+    }
+
+    @Test
+    public void testRecoveryWithBadSize() throws Exception
+    {
+        Checksum checksum = new CRC32();
+        checksum.update(100);
+        testMalformed(badLogFile(120, checksum.getValue(), garbage(100)));
     }
 
     @Test
     public void testRecoveryWithZeroSegmentSizeArgument() throws Exception
     {
         // many different combinations of 4 bytes (garbage) will be read as zero by readInt()
-        testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF
+        testMalformed(badLogFile(0, -1L, 10)); // zero size, but no EOF
     }
 
     @Test
     public void testRecoveryWithNegativeSizeArgument() throws Exception
     {
         // garbage from a partial/bad flush could be read as a negative size even if there is no EOF
-        testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF
+        testMalformed(badLogFile(-10, 10)); // zero size, but no EOF
     }
 
     @Test
@@ -174,8 +190,8 @@ public class CommitLogTest extends SchemaLoader
 
     private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column)
     {
-        Mutation rm = new Mutation("Keyspace1", bytes("k"));
-        rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0);
+        Mutation rm = new Mutation(keyspace, key);
+        rm.add(table, column, ByteBuffer.allocate(0), 0);
 
         int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2);
         max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead
@@ -215,22 +231,73 @@ public class CommitLogTest extends SchemaLoader
         }
     }
 
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
+    // construct log file with correct chunk checksum for the provided size/position
+    protected File badLogFile(int markerSize, int realSize) throws Exception
     {
-        Checksum checksum = new CRC32();
-        checksum.update(size);
-        testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue());
+        return badLogFile(markerSize, garbage(realSize));
     }
 
-    protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception
+    protected File badLogFile(int markerSize, byte[] data) throws Exception
+    {
+        File logFile = tmpFile();
+        CommitLogDescriptor descriptor = CommitLogDescriptor.fromFileName(logFile.getName());
+        PureJavaCrc32 crc = new PureJavaCrc32();
+        crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL));
+        crc.updateInt((int) (descriptor.id >>> 32));
+        crc.updateInt(CommitLogDescriptor.HEADER_SIZE);
+        return badLogFile(markerSize, crc.getCrc(), data, logFile);
+    }
+
+    protected byte[] garbage(int size)
+    {
+        byte[] garbage = new byte[size];
+        (new java.util.Random()).nextBytes(garbage);
+        return garbage;
+    }
+
+    protected File badLogFile(int markerSize, long checksum, int realSize) throws Exception
+    {
+        return badLogFile(markerSize, checksum, realSize, tmpFile());
+    }
+
+    protected File badLogFile(int markerSize, long checksum, int realSize, File logFile) throws Exception
+    {
+        return badLogFile(markerSize, checksum, new byte[realSize], logFile);
+    }
+
+    protected File badLogFile(int markerSize, long checksum, byte[] chunk) throws Exception
+    {
+        return badLogFile(markerSize, checksum, chunk, tmpFile());
+    }
+
+    protected File badLogFile(int markerSize, long checksum, byte[] chunk, File logFile) throws Exception
     {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(out);
-        dout.writeInt(size);
+        ByteBuffer buffer = ByteBuffer.allocate(CommitLogDescriptor.HEADER_SIZE);
+        CommitLogDescriptor.writeHeader(buffer, CommitLogDescriptor.fromFileName(logFile.getName()));
+        out.write(buffer.array());
+        dout.writeInt(markerSize);
         dout.writeLong(checksum);
-        dout.write(new byte[dataSize]);
+        dout.write(chunk);
         dout.close();
-        testRecovery(out.toByteArray());
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(out.toByteArray());
+            lout.close();
+        }
+        return logFile;
+    }
+
+    protected File badLogFile(byte[] contents) throws Exception
+    {
+        File logFile = tmpFile();
+        try (OutputStream lout = new FileOutputStream(logFile))
+        {
+            lout.write(contents);
+            lout.close();
+        }
+        return logFile;
     }
 
     protected File tmpFile() throws IOException
@@ -241,17 +308,29 @@ public class CommitLogTest extends SchemaLoader
         return logFile;
     }
 
-    protected void testRecovery(byte[] logData) throws Exception
+    private void testMalformed(byte[] contents) throws Exception
     {
-        File logFile = tmpFile();
-        try (OutputStream lout = new FileOutputStream(logFile))
+        testMalformed(badLogFile(contents));
+        testMalformed(badLogFile(contents.length, contents));
+    }
+
+    private void testMalformed(File logFile) throws Exception
+    {
+        CommitLogReplayer.setIgnoreErrors(true);
+        CommitLog.instance.recover(new File[]{ logFile });
+        CommitLogReplayer.setIgnoreErrors(false);
+        try
         {
-            lout.write(logData);
-            //statics make it annoying to test things correctly
-            CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+            CommitLog.instance.recover(new File[]{ logFile });
+            Assert.assertFalse(true);
+        }
+        catch (Throwable t)
+        {
+            if (!(t instanceof MalformedCommitLogException))
+                throw t;
         }
     }
-    
+
     @Test
     public void testVersions()
     {


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	test/unit/org/apache/cassandra/db/CommitLogTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ac4a0263
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ac4a0263
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ac4a0263

Branch: refs/heads/trunk
Commit: ac4a0263fe3a22b7096c3aefa70f07ea23d691b3
Parents: ae03e1b 581ce63
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Thu Jul 10 10:01:19 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Thu Jul 10 10:01:19 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/commitlog/CommitLog.java       |  17 ++-
 .../db/commitlog/CommitLogDescriptor.java       |   8 +-
 .../db/commitlog/CommitLogReplayer.java         |  76 ++++++++---
 .../commitlog/MalformedCommitLogException.java  |  16 +++
 .../cassandra/service/CassandraDaemon.java      |   2 +
 .../org/apache/cassandra/db/CommitLogTest.java  | 128 +++++++++++++++----
 7 files changed, 200 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac4a0263/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f2cd844,02a2d52..b5e7ec7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,5 +1,21 @@@
 +3.0
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7208)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 +
 +
  2.1.1
+  * Fail to start if commit log replay detects a problem (CASSANDRA-7125)
   * Improve schema merge performance (CASSANDRA-7444)
   * Fix NPE when unknown prepared statement ID is used (CASSANDRA-7454)
   * Adjust MT depth based on # of partition validating (CASSANDRA-5263)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac4a0263/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac4a0263/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
index e50a585,77c25d3..3f1b7b5
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogDescriptor.java
@@@ -28,8 -28,11 +28,10 @@@ import java.nio.ByteBuffer
  import java.util.regex.Matcher;
  import java.util.regex.Pattern;
  
+ import com.google.common.annotations.VisibleForTesting;
+ 
  import org.apache.cassandra.io.FSReadError;
  import org.apache.cassandra.net.MessagingService;
 -import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.PureJavaCrc32;
  
  public class CommitLogDescriptor

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ac4a0263/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/CommitLogTest.java
index b875987,dd05272..762d2d0
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@@ -35,33 -34,24 +35,37 @@@ import org.apache.cassandra.SchemaLoade
  import org.apache.cassandra.Util;
  import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.KSMetaData;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
+ import org.apache.cassandra.db.commitlog.CommitLogReplayer;
  import org.apache.cassandra.db.commitlog.CommitLogSegment;
+ import org.apache.cassandra.db.commitlog.MalformedCommitLogException;
  import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.PureJavaCrc32;
  
  import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
  
 -public class CommitLogTest extends SchemaLoader
 +public class CommitLogTest
  {
 +    private static final String KEYSPACE1 = "CommitLogTest";
 +    private static final String CF1 = "Standard1";
 +    private static final String CF2 = "Standard2";
  
 -    static
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
      {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE1,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF1),
 +                                    SchemaLoader.standardCFMD(KEYSPACE1, CF2));
+         System.setProperty("cassandra.commitlog.stop_on_errors", "true");
      }
  
      @Test