You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/29 05:19:01 UTC
svn commit: r819817 - in
/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db:
CommitLog.java CommitLogHeader.java
Author: jbellis
Date: Tue Sep 29 03:18:59 2009
New Revision: 819817
URL: http://svn.apache.org/viewvc?rev=819817&view=rev
Log:
don't preserve dirty bits from older replay segments. clean up comments. patch by jbellis; reviewed by junrao for CASSANDRA-459
Modified:
incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java
Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java?rev=819817&r1=819816&r2=819817&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java Tue Sep 29 03:18:59 2009
@@ -35,8 +35,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang.StringUtils;
-
/*
* Commit Log tracks every write operation into the system. The aim
* of the commit log is to be able to successfully recover data that was
@@ -368,7 +366,7 @@
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
int id = table.getColumnFamilyId(columnFamily.name());
- if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+ if (!clHeader_.isDirty(id))
{
clHeader_.turnOn(id, logWriter_.getFilePointer());
seekAndWriteCommitLogHeader(clHeader_.toByteArray());
@@ -490,60 +488,48 @@
*/
assert cLogCtx.position >= commitLogHeader.getPosition(id);
- commitLogHeader.turnOff(id);
/* Sort the commit logs based on creation time */
List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
Collections.sort(oldFiles, new CommitLogFileComparator());
- List<String> listOfDeletedFiles = new ArrayList<String>();
+
/*
* Loop through all the commit log files in the history. Now process
* all files that are older than the one in the context. For each of
- * these files the header needs to modified by performing a bitwise &
- * of the header with the header of the file in the context. If we
- * encounter the file in the context in our list of old commit log files
- * then we update the header and write it back to the commit log.
+ * these files the header needs to modified by resetting the dirty
+ * bit corresponding to the flushed CF.
*/
for (String oldFile : oldFiles)
{
if (oldFile.equals(cLogCtx.file))
{
- /*
- * We need to turn on again. This is because we always keep
- * the bit turned on and the position indicates from where the
- * commit log needs to be read. When a flush occurs we turn off
- * perform & operation and then turn on with the new position.
- */
+ // we can't just mark the segment where the flush happened clean,
+ // since there may have been writes to it between when the flush
+ // started and when it finished. so mark the flush position as
+ // the replay point for this CF, instead.
if (logger_.isDebugEnabled())
logger_.debug("Marking replay position on current commit log " + oldFile);
commitLogHeader.turnOn(id, cLogCtx.position);
seekAndWriteCommitLogHeader(commitLogHeader.toByteArray());
break;
}
- else
- {
- CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
- oldCommitLogHeader.and(commitLogHeader);
- if (oldCommitLogHeader.isSafeToDelete())
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Deleting commit log:" + oldFile);
- FileUtils.deleteAsync(oldFile);
- listOfDeletedFiles.add(oldFile);
- }
- else
- {
- if (logger_.isDebugEnabled())
- logger_.debug("Not safe to delete commit log " + oldFile + "; dirty is " + oldCommitLogHeader.dirtyString());
- RandomAccessFile logWriter = CommitLog.createWriter(oldFile);
- writeCommitLogHeader(logWriter, oldCommitLogHeader.toByteArray());
- logWriter.close();
- }
- }
- }
- for ( String deletedFile : listOfDeletedFiles)
- {
- clHeaders_.remove(deletedFile);
+ CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
+ oldCommitLogHeader.turnOff(id);
+ if (oldCommitLogHeader.isSafeToDelete())
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Deleting commit log:" + oldFile);
+ FileUtils.deleteAsync(oldFile);
+ clHeaders_.remove(oldFile);
+ }
+ else
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Not safe to delete commit log " + oldFile + "; dirty is " + oldCommitLogHeader.dirtyString());
+ RandomAccessFile logWriter = CommitLog.createWriter(oldFile);
+ writeCommitLogHeader(logWriter, oldCommitLogHeader.toByteArray());
+ logWriter.close();
+ }
}
}
@@ -560,10 +546,7 @@
logWriter_ = CommitLog.createWriter(logFile_);
/* squirrel away the old commit log header */
clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
- // we leave the old 'dirty' bits alone, so we can test for
- // whether it's safe to remove a given log segment by and-ing its dirty
- // with the current one.
- clHeader_.zeroPositions();
+ clHeader_.clear();
writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
return true;
}
Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=819817&r1=819816&r2=819817&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java Tue Sep 29 03:18:59 2009
@@ -20,6 +20,7 @@
import java.io.*;
import java.util.BitSet;
+import java.util.Arrays;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.DataInputBuffer;
@@ -35,17 +36,6 @@
return serializer;
}
- public static BitSet and(byte[] bytes1, byte[] bytes2) throws IOException
- {
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(bytes1, 0, bytes1.length);
- CommitLogHeader header1 = serializer.deserialize(bufIn);
- bufIn.reset(bytes2, 0, bytes2.length);
- CommitLogHeader header2 = serializer.deserialize(bufIn);
- header1.and(header2);
- return header1.dirty;
- }
-
static int getLowestPosition(CommitLogHeader clHeader)
{
int minPosition = Integer.MAX_VALUE;
@@ -116,17 +106,12 @@
return dirty.isEmpty();
}
- void zeroPositions()
+ void clear()
{
- int size = lastFlushedAt.length;
- lastFlushedAt = new int[size];
+ dirty.clear();
+ Arrays.fill(lastFlushedAt, 0);
}
-
- void and(CommitLogHeader commitLogHeader)
- {
- dirty.and(commitLogHeader.dirty);
- }
-
+
byte[] toByteArray() throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();