You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/28 23:04:41 UTC

svn commit: r779770 - in /incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra: db/ColumnFamilyStore.java db/CommitLog.java db/Memtable.java db/PeriodicFlushManager.java io/SSTable.java

Author: jbellis
Date: Thu May 28 21:04:40 2009
New Revision: 779770

URL: http://svn.apache.org/viewvc?rev=779770&view=rev
Log:
fix possible data loss during multiple restarts.  patch by jbellis; reviewed by Jun Rao for CASSANDRA-204

Modified:
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=779770&r1=779769&r2=779770&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu May 28 21:04:40 2009
@@ -188,7 +188,6 @@
         ssTables_.addAll(filenames);
         /* Load the index files and the Bloom Filters associated with them. */
         SSTable.onStart(filenames);
-        logger_.debug("Submitting a major compaction task ...");
         MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
         if (columnFamily_.equals(Table.hints_))
         {
@@ -417,13 +416,6 @@
         return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index;
     }
 
-
-    /*
-    * This version is used only on start up when we are recovering from logs.
-    * In the future we may want to parellelize the log processing for a table
-    * by having a thread per log file present for recovery. Re-visit at that
-    * time.
-    */
     void switchMemtable()
     {
         memtableLock_.writeLock().lock();
@@ -445,12 +437,6 @@
         memtableSwitchCount++;
     }
 
-    /*
-     * This version is used only on start up when we are recovering from logs.
-     * In the future we may want to parellelize the log processing for a table
-     * by having a thread per log file present for recovery. Re-visit at that
-     * time.
-     */
     void switchBinaryMemtable(String key, byte[] buffer) throws IOException
     {
         binaryMemtable_.set(new BinaryMemtable(table_, columnFamily_));
@@ -481,7 +467,6 @@
     void forceFlushBinary()
     {
         BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
-        //binaryMemtable_.get().flush(true);
     }
 
     /**
@@ -728,7 +713,7 @@
      */
     void applyNow(String key, ColumnFamily columnFamily) throws IOException
     {
-        getMemtableThreadSafe().putOnRecovery(key, columnFamily);
+        getMemtableThreadSafe().put(key, columnFamily);
     }
 
     /*

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/CommitLog.java?rev=779770&r1=779769&r2=779770&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/CommitLog.java Thu May 28 21:04:40 2009
@@ -34,6 +34,8 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.commons.lang.StringUtils;
+
 /*
  * Commit Log tracks every write operation into the system. The aim
  * of the commit log is to be able to successfully recover data that was
@@ -300,16 +302,12 @@
             file = clogs.get(i);
             readCommitLogHeader(file.getAbsolutePath(), header2);
             byte[] result = CommitLogHeader.and(header, header2);
-            if ( !CommitLogHeader.isZero(result) )
-            {
-                filesNeeded.push(file);
-            }
-            else
-            {
+            if (CommitLogHeader.isZero(result))
                 break;
-            }
+            filesNeeded.push(file);
         }
 
+        logger_.info("Replaying logs from " + StringUtils.join(filesNeeded, ", "));
         doRecovery(filesNeeded, header);
     }
 

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/Memtable.java?rev=779770&r1=779769&r2=779770&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/Memtable.java Thu May 28 21:04:40 2009
@@ -160,6 +160,8 @@
         }
     }
 
+    /** flush synchronously (in the current thread, not on the executor).
+     *  only the recover code should call this. */
     void flushOnRecovery() throws IOException {
         if (!isClean())
             flush(CommitLog.CommitLogContext.NULL);
@@ -201,16 +203,6 @@
         return builder.toString();
     }
 
-    /**
-     * This version is called on commit log recovery. The threshold
-     * is not respected and a forceFlush() needs to be invoked to flush
-     * the contents to disk.  Does not go through the executor.
-    */
-    void putOnRecovery(String key, ColumnFamily columnFamily)
-    {
-        resolve(key, columnFamily);
-    }
-
     ColumnFamily getLocalCopy(String key, String columnFamilyColumn, IFilter filter)
     {
     	String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/PeriodicFlushManager.java?rev=779770&r1=779769&r2=779770&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/PeriodicFlushManager.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/db/PeriodicFlushManager.java Thu May 28 21:04:40 2009
@@ -76,7 +76,6 @@
                 columnFamilyStore.forceFlush();
             }
         };
-        logger_.info("start periodic flush daemon every " + flushPeriodInMinutes + " minutes for " + columnFamilyStore.columnFamily_);
         flusher_.scheduleWithFixedDelay(runnable, flushPeriodInMinutes, flushPeriodInMinutes, TimeUnit.MINUTES);       
     }
 }

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java?rev=779770&r1=779769&r2=779770&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/io/SSTable.java Thu May 28 21:04:40 2009
@@ -403,6 +403,7 @@
 
     private void loadIndexFile() throws IOException
     {
+        logger_.debug("Loading indexes from " + dataFile_);
         IFileReader indexReader = null;
         /* Read all block indexes to maintain an index in memory */
         try
@@ -441,7 +442,6 @@
                     String blockIndexKey = bufIn.readUTF();
                     if (!blockIndexKey.equals(SSTable.blockIndexKey_))
                     {
-                        logger_.debug(" Done reading the block indexes, Index has been created");
                         break;
                     }
                     /* read the size of the block index */