You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/11 16:27:10 UTC

svn commit: r889654 - /incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java

Author: jbellis
Date: Fri Dec 11 15:27:10 2009
New Revision: 889654

URL: http://svn.apache.org/viewvc?rev=889654&view=rev
Log:
add CRC32 to catch commitlog entries that did not get fsynced.  patch by jbellis; reviewed by junrao for CASSANDRA-605

Modified:
    incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java

Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java?rev=889654&r1=889653&r2=889654&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java Fri Dec 11 15:27:10 2009
@@ -31,6 +31,8 @@
 
 import java.io.*;
 import java.util.*;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -305,11 +307,13 @@
                 if (logger_.isDebugEnabled())
                     logger_.debug("Reading mutation at " + reader.getFilePointer());
 
+                long claimedCRC32;
                 byte[] bytes;
                 try
                 {
                     bytes = new byte[(int) reader.readLong()]; // readlong can throw EOFException too
                     reader.readFully(bytes);
+                    claimedCRC32 = reader.readLong();
                 }
                 catch (EOFException e)
                 {
@@ -317,8 +321,16 @@
                     break;
                 }
                 bufIn.reset(bytes, bytes.length);
+                Checksum checksum = new CRC32();
+                checksum.update(bytes, 0, bytes.length);
+                if (claimedCRC32 != checksum.getValue())
+                {
+                    // this part of the log must not have been fsynced.  probably the rest is bad too,
+                    // but just in case there is no harm in trying them.
+                    continue;
+                }
 
-                /* read the commit log entry */
+                /* deserialize the commit log entry */
                 final RowMutation rm = RowMutation.serializer().deserialize(bufIn);
                 if (logger_.isDebugEnabled())
                     logger_.debug(String.format("replaying mutation for %s.%s: %s",
@@ -620,16 +632,16 @@
             long currentPosition = -1L;
             try
             {
-                /* serialize the row */
                 currentPosition = logWriter_.getFilePointer();
                 CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
-                /* Update the header */
                 maybeUpdateHeader(rowMutation);
+                Checksum checkum = new CRC32();
                 if (serializedRow instanceof DataOutputBuffer)
                 {
                     DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
                     logWriter_.writeLong(buffer.getLength());
                     logWriter_.write(buffer.getData(), 0, buffer.getLength());
+                    checkum.update(buffer.getData(), 0, buffer.getLength());
                 }
                 else
                 {
@@ -637,7 +649,9 @@
                     byte[] bytes = (byte[]) serializedRow;
                     logWriter_.writeLong(bytes.length);
                     logWriter_.write(bytes);
+                    checkum.update(bytes, 0, bytes.length);
                 }
+                logWriter_.writeLong(checkum.getValue());
                 maybeRollLog();
                 return cLogCtx;
             }