You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/06/30 00:07:49 UTC

svn commit: r789465 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/CommitLog.java test/unit/org/apache/cassandra/db/RecoveryManager2Test.java test/unit/org/apache/cassandra/db/RecoveryManagerTest.java

Author: jbellis
Date: Mon Jun 29 22:07:49 2009
New Revision: 789465

URL: http://svn.apache.org/viewvc?rev=789465&view=rev
Log:
add test catching buggy update of header on flush; refactor so there is only one version of code doing those writes (the correct one).
patch by jbellis; reviewed by Jun Rao for CASSANDRA-264

Added:
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=789465&r1=789464&r2=789465&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Mon Jun 29 22:07:49 2009
@@ -123,18 +123,6 @@
         return Long.parseLong(entries[entries.length - 2]);
     }
 
-    /*
-     * Write the serialized commit log header into the specified commit log.
-    */
-    private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
-    {     
-        IFileWriter logWriter = CommitLog.createWriter(commitLogFileName);
-        logWriter.seek(0L);
-        /* write the commit log header */
-        logWriter.writeDirect(bytes);
-        logWriter.close();
-    }
-
     private static IFileWriter createWriter(String file) throws IOException
     {        
         return SequenceFile.writer(file);
@@ -221,31 +209,40 @@
     }
 
     /*
+     * Write the serialized commit log header into the specified commit log.
+    */
+    private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
+    {
+        IFileWriter logWriter = CommitLog.createWriter(commitLogFileName);
+        writeCommitLogHeader(logWriter, bytes);
+    }
+
+    /*
      * This is invoked on startup via the ctor. It basically
      * writes a header with all bits set to zero.
     */
     private void writeCommitLogHeader() throws IOException
     {
         int cfSize = Table.TableMetadata.getColumnFamilyCount();
-        /* record the beginning of the commit header */
-        /* write the commit log header */
         clHeader_ = new CommitLogHeader(cfSize);
-        writeCommitLogHeader(clHeader_.toByteArray(), false);
+        writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
     }
 
-    private void writeCommitLogHeader(byte[] bytes, boolean reset) throws IOException
+    /** writes header at the beginning of the file, then seeks back to current position */
+    private void seekAndWriteCommitLogHeader(byte[] bytes) throws IOException
     {
-        /* record the current position */
         long currentPos = logWriter_.getCurrentPosition();
         logWriter_.seek(0);
-        /* write the commit log header */
-        logWriter_.writeLong(bytes.length);
-        logWriter_.writeDirect(bytes);
-        if (reset)
-        {
-            /* seek back to the old position */
-            logWriter_.seek(currentPos);
-        }
+
+        writeCommitLogHeader(logWriter_, bytes);
+
+        logWriter_.seek(currentPos);
+    }
+
+    private static void writeCommitLogHeader(IFileWriter logWriter, byte[] bytes) throws IOException
+    {
+        logWriter.writeLong(bytes.length);
+        logWriter.writeDirect(bytes);
     }
 
     void recover(File[] clogs) throws IOException
@@ -254,7 +251,6 @@
 
         for (File file : clogs)
         {
-            // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
             IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
             CommitLogHeader clHeader = readCommitLogHeader(reader);
             /* seek to the lowest position */
@@ -322,7 +318,7 @@
             if (!clHeader_.isDirty(id) || (clHeader_.isDirty(id) && clHeader_.getPosition(id) == 0))
             {
                 clHeader_.turnOn(id, logWriter_.getCurrentPosition());
-                writeCommitLogHeader(clHeader_.toByteArray(), true);
+                seekAndWriteCommitLogHeader(clHeader_.toByteArray());
             }
         }
     }
@@ -380,18 +376,17 @@
     {
         Table table = Table.open(tableName);
         int id = table.getColumnFamilyId(cf);
-        /* trying discarding old commit log files */
-        discard(cLogCtx, id);
+        discardCompletedSegments(cLogCtx, id);
     }
 
     /*
-     * Check if old commit logs can be deleted.
+     * Delete log segments whose contents have been turned into SSTables.
      *
      * param @ cLogCtx The commitLog context .
      * param @ id id of the columnFamily being flushed to disk.
      *
     */
-    private void discard(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
+    private void discardCompletedSegments(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
     {
         /* retrieve the commit log header associated with the file in the context */
         CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file);
@@ -482,7 +477,7 @@
             // whether it's safe to remove a given log segment by and-ing its dirty
             // with the current one.
             clHeader_.zeroPositions();
-            writeCommitLogHeader(clHeader_.toByteArray(), false);
+            writeCommitLogHeader(logWriter_, clHeader_.toByteArray());
         }
     }
 }
\ No newline at end of file

Added: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java?rev=789465&view=auto
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java (added)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java Mon Jun 29 22:07:49 2009
@@ -0,0 +1,39 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+
+public class RecoveryManager2Test extends CleanupHelper
+{
+    @Test
+    public void testWithFlush() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table1 = Table.open("Table1");
+        Set<String> keys = new HashSet<String>();
+
+        for (int i = 0; i < 100; i++)
+        {
+            String key = "key" + i;
+            RowMutation rm = new RowMutation("Table1", key);
+            ColumnFamily cf = ColumnFamily.create("Table1", "Standard1");
+            cf.addColumn(new Column("col1", "val1".getBytes(), 1L));
+            rm.add(cf);
+            rm.apply();
+            keys.add(key);
+        }
+        table1.getColumnFamilyStore("Standard1").forceBlockingFlush();
+
+        table1.getColumnFamilyStore("Standard1").clearUnsafe();
+        RecoveryManager.doRecovery();
+
+        Set<String> foundKeys = new HashSet<String>(table1.getKeyRange(Arrays.asList("Standard1"), "", "", 1000));
+        assert keys.equals(foundKeys);
+    }
+}

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java?rev=789465&r1=789464&r2=789465&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java Mon Jun 29 22:07:49 2009
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.HashSet;
 
 import org.junit.Test;
 
@@ -36,7 +39,7 @@
     }
 
     @Test
-    public void testSomething() throws IOException, ExecutionException, InterruptedException
+    public void testOne() throws IOException, ExecutionException, InterruptedException
     {
         Table table1 = Table.open("Table1");
         Table table2 = Table.open("Table2");