You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/29 05:19:01 UTC

svn commit: r819817 - in /incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db: CommitLog.java CommitLogHeader.java

Author: jbellis
Date: Tue Sep 29 03:18:59 2009
New Revision: 819817

URL: http://svn.apache.org/viewvc?rev=819817&view=rev
Log:
don't preserve dirty bits from older replay segments. clean up comments.  patch by jbellis; reviewed by junrao for CASSANDRA-459

Modified:
    incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java

Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java?rev=819817&r1=819816&r2=819817&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLog.java Tue Sep 29 03:18:59 2009
@@ -35,8 +35,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.commons.lang.StringUtils;
-
 /*
  * Commit Log tracks every write operation into the system. The aim
  * of the commit log is to be able to successfully recover data that was
@@ -368,7 +366,7 @@
         for (ColumnFamily columnFamily : row.getColumnFamilies())
         {
             int id = table.getColumnFamilyId(columnFamily.name());
-            if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
+            if (!clHeader_.isDirty(id))
             {
                 clHeader_.turnOn(id, logWriter_.getFilePointer());
                 seekAndWriteCommitLogHeader(clHeader_.toByteArray());
@@ -490,60 +488,48 @@
         */
         assert cLogCtx.position >= commitLogHeader.getPosition(id);
 
-        commitLogHeader.turnOff(id);
         /* Sort the commit logs based on creation time */
         List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
         Collections.sort(oldFiles, new CommitLogFileComparator());
-        List<String> listOfDeletedFiles = new ArrayList<String>();
+
         /*
          * Loop through all the commit log files in the history. Now process
          * all files that are older than the one in the context. For each of
-         * these files the header needs to modified by performing a bitwise &
-         * of the header with the header of the file in the context. If we
-         * encounter the file in the context in our list of old commit log files
-         * then we update the header and write it back to the commit log.
+         * these files the header needs to modified by resetting the dirty
+         * bit corresponding to the flushed CF.
         */
         for (String oldFile : oldFiles)
         {
             if (oldFile.equals(cLogCtx.file))
             {
-                /*
-                 * We need to turn on again. This is because we always keep
-                 * the bit turned on and the position indicates from where the
-                 * commit log needs to be read. When a flush occurs we turn off
-                 * perform & operation and then turn on with the new position.
-                */
+                // we can't just mark the segment where the flush happened clean,
+                // since there may have been writes to it between when the flush
+                // started and when it finished. so mark the flush position as
+                // the replay point for this CF, instead.
                 if (logger_.isDebugEnabled())
                     logger_.debug("Marking replay position on current commit log " + oldFile);
                 commitLogHeader.turnOn(id, cLogCtx.position);
                 seekAndWriteCommitLogHeader(commitLogHeader.toByteArray());
                 break;
             }
-            else
-            {
-                CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
-                oldCommitLogHeader.and(commitLogHeader);
-                if (oldCommitLogHeader.isSafeToDelete())
-                {
-                    if (logger_.isDebugEnabled())
-                      logger_.debug("Deleting commit log:" + oldFile);
-                    FileUtils.deleteAsync(oldFile);
-                    listOfDeletedFiles.add(oldFile);
-                }
-                else
-                {
-                    if (logger_.isDebugEnabled())
-                        logger_.debug("Not safe to delete commit log " + oldFile + "; dirty is " + oldCommitLogHeader.dirtyString());
-                    RandomAccessFile logWriter = CommitLog.createWriter(oldFile);
-                    writeCommitLogHeader(logWriter, oldCommitLogHeader.toByteArray());
-                    logWriter.close();
-                }
-            }
-        }
 
-        for ( String deletedFile : listOfDeletedFiles)
-        {
-            clHeaders_.remove(deletedFile);
+	    CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
+	    oldCommitLogHeader.turnOff(id);
+	    if (oldCommitLogHeader.isSafeToDelete())
+	    {
+		if (logger_.isDebugEnabled())
+		  logger_.debug("Deleting commit log:" + oldFile);
+		FileUtils.deleteAsync(oldFile);
+		clHeaders_.remove(oldFile);
+	    }
+	    else
+	    {
+		if (logger_.isDebugEnabled())
+		    logger_.debug("Not safe to delete commit log " + oldFile + "; dirty is " + oldCommitLogHeader.dirtyString());
+		RandomAccessFile logWriter = CommitLog.createWriter(oldFile);
+		writeCommitLogHeader(logWriter, oldCommitLogHeader.toByteArray());
+		logWriter.close();
+	    }
         }
     }
 
@@ -560,10 +546,7 @@
             logWriter_ = CommitLog.createWriter(logFile_);
             /* squirrel away the old commit log header */
             clHeaders_.put(oldLogFile, new CommitLogHeader(clHeader_));
-            // we leave the old 'dirty' bits alone, so we can test for
-            // whether it's safe to remove a given log segment by and-ing its dirty
-            // with the current one.
-            clHeader_.zeroPositions();
+            clHeader_.clear();
             writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
             return true;
         }

Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java?rev=819817&r1=819816&r2=819817&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/CommitLogHeader.java Tue Sep 29 03:18:59 2009
@@ -20,6 +20,7 @@
 
 import java.io.*;
 import java.util.BitSet;
+import java.util.Arrays;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.DataInputBuffer;
@@ -35,17 +36,6 @@
         return serializer;
     }
         
-    public static BitSet and(byte[] bytes1, byte[] bytes2) throws IOException
-    {
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(bytes1, 0, bytes1.length);
-        CommitLogHeader header1 = serializer.deserialize(bufIn);
-        bufIn.reset(bytes2, 0, bytes2.length);
-        CommitLogHeader header2 = serializer.deserialize(bufIn);
-        header1.and(header2);
-        return header1.dirty;
-    }
-
     static int getLowestPosition(CommitLogHeader clHeader)
     {
         int minPosition = Integer.MAX_VALUE;
@@ -116,17 +106,12 @@
         return dirty.isEmpty();
     }
 
-    void zeroPositions()
+    void clear()
     {
-        int size = lastFlushedAt.length;
-        lastFlushedAt = new int[size];
+        dirty.clear();
+        Arrays.fill(lastFlushedAt, 0);
     }
-    
-    void and(CommitLogHeader commitLogHeader)
-    {
-        dirty.and(commitLogHeader.dirty);
-    }
-    
+        
     byte[] toByteArray() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();