You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/11 16:27:10 UTC
svn commit: r889654 -
/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java
Author: jbellis
Date: Fri Dec 11 15:27:10 2009
New Revision: 889654
URL: http://svn.apache.org/viewvc?rev=889654&view=rev
Log:
add CRC32 to catch commitlog entries that did not get fsynced. patch by jbellis; reviewed by junrao for CASSANDRA-605
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java
Modified: incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java?rev=889654&r1=889653&r2=889654&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/db/CommitLog.java Fri Dec 11 15:27:10 2009
@@ -31,6 +31,8 @@
import java.io.*;
import java.util.*;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -305,11 +307,13 @@
if (logger_.isDebugEnabled())
logger_.debug("Reading mutation at " + reader.getFilePointer());
+ long claimedCRC32;
byte[] bytes;
try
{
bytes = new byte[(int) reader.readLong()]; // readlong can throw EOFException too
reader.readFully(bytes);
+ claimedCRC32 = reader.readLong();
}
catch (EOFException e)
{
@@ -317,8 +321,16 @@
break;
}
bufIn.reset(bytes, bytes.length);
+ Checksum checksum = new CRC32();
+ checksum.update(bytes, 0, bytes.length);
+ if (claimedCRC32 != checksum.getValue())
+ {
+ // this part of the log must not have been fsynced. probably the rest is bad too,
+ // but just in case there is no harm in trying them.
+ continue;
+ }
- /* read the commit log entry */
+ /* deserialize the commit log entry */
final RowMutation rm = RowMutation.serializer().deserialize(bufIn);
if (logger_.isDebugEnabled())
logger_.debug(String.format("replaying mutation for %s.%s: %s",
@@ -620,16 +632,16 @@
long currentPosition = -1L;
try
{
- /* serialize the row */
currentPosition = logWriter_.getFilePointer();
CommitLogContext cLogCtx = new CommitLogContext(logFile_, currentPosition);
- /* Update the header */
maybeUpdateHeader(rowMutation);
+ Checksum checkum = new CRC32();
if (serializedRow instanceof DataOutputBuffer)
{
DataOutputBuffer buffer = (DataOutputBuffer) serializedRow;
logWriter_.writeLong(buffer.getLength());
logWriter_.write(buffer.getData(), 0, buffer.getLength());
+ checkum.update(buffer.getData(), 0, buffer.getLength());
}
else
{
@@ -637,7 +649,9 @@
byte[] bytes = (byte[]) serializedRow;
logWriter_.writeLong(bytes.length);
logWriter_.write(bytes);
+ checkum.update(bytes, 0, bytes.length);
}
+ logWriter_.writeLong(checkum.getValue());
maybeRollLog();
return cLogCtx;
}