You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/12/09 06:02:09 UTC

svn commit: r888701 - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/db/

Author: jbellis
Date: Wed Dec  9 05:02:08 2009
New Revision: 888701

URL: http://svn.apache.org/viewvc?rev=888701&view=rev
Log:
respect memtable thresholds when replaying commit log
patch by jbellis; reviewed by Jun Rao for CASSANDRA-609

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Dec  9 05:02:08 2009
@@ -18,6 +18,7 @@
  * avoid GCing tombstones except on major compaction (CASSANDRA-604)
  * increase failure conviction threshold, resulting in less nodes
    incorrectly (and temporarily) marked as down (CASSANDRA-610)
+ * respect memtable thresholds during log replay (CASSANDRA-609)
 
 
 0.5.0 beta

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Dec  9 05:02:08 2009
@@ -372,7 +372,7 @@
                              columnFamily_, SSTable.TEMPFILE_MARKER, fileIndexGenerator_.incrementAndGet());
     }
 
-    Future<?> switchMemtable(Memtable oldMemtable) throws IOException
+    Future<?> switchMemtable(Memtable oldMemtable, final boolean writeCommitLog) throws IOException
     {
         /**
          *  If we can get the writelock, that means no new updates can come in and 
@@ -382,7 +382,7 @@
         Table.flusherLock_.writeLock().lock();
         try
         {
-            final CommitLog.CommitLogContext ctx = CommitLog.open().getContext();
+            final CommitLog.CommitLogContext ctx = CommitLog.open().getContext(); // this is harmless if !writeCommitLog
 
             if (oldMemtable.isFrozen())
             {
@@ -401,7 +401,12 @@
                     try
                     {
                         condition.await();
-                        onMemtableFlush(ctx);
+                        if (writeCommitLog)
+                        {
+                            // if we're not writing to the commit log, we are replaying the log, so marking
+                            // the log header with "you can discard anything written before the context" is not valid
+                            onMemtableFlush(ctx);
+                        }
                     }
                     catch (Exception e)
                     {
@@ -438,7 +443,7 @@
         if (memtable_.isClean())
             return null;
 
-        return switchMemtable(memtable_);
+        return switchMemtable(memtable_, true);
     }
 
     void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
@@ -561,17 +566,6 @@
     }
 
     /*
-     * This version is used only on start up when we are recovering from logs.
-     * Hence no locking is required since we process logs on the main thread. In
-     * the future we may want to parellelize the log processing for a table by
-     * having a thread per log file present for recovery. Re-visit at that time.
-     */
-    void applyNow(String key, ColumnFamily columnFamily) throws IOException
-    {
-        getMemtableThreadSafe().put(key, columnFamily);
-    }
-
-    /*
      * This method is called when the Memtable is frozen and ready to be flushed
      * to disk. This method informs the CommitLog that a particular ColumnFamily
      * is being flushed to disk.
@@ -1005,6 +999,15 @@
         return memtables;
     }
 
+    /**
+     * submits flush sort on the flushSorter executor, which will in turn submit to flushWriter when sorted.
+     * TODO because our executors use CallerRunsPolicy, when flushSorter fills up, no writes will proceed
+     * because the next flush will start executing on the caller, mutation-stage thread that has the
+     * flush write lock held.  (writes aquire this as a read lock before proceeding.)
+     * This is good, because it backpressures flushes, but bad, because we can't write until that last
+     * flushing thread finishes sorting, which will almost always be longer than any of the flushSorter threads proper
+     * (since, by definition, it started last).
+     */
     Condition submitFlush(final IFlushable flushable)
     {
         logger_.info("Enqueuing flush of " + flushable);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Wed Dec  9 05:02:08 2009
@@ -352,11 +352,11 @@
                         {
                             try
                             {
-                                table.applyNow(rm);
+                                rm.apply(false);
                             }
                             catch (IOException e)
                             {
-                                throw new RuntimeException(e);
+                                throw new IOError(e);
                             }
                         }
                     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Dec  9 05:02:08 2009
@@ -199,8 +199,13 @@
      * to the table that is obtained by calling Table.open().
     */
     public void apply() throws IOException
-    {   
-        Table.open(table_).apply(this, this.getSerializedBuffer());
+    {
+        apply(true);
+    }
+
+    public void apply(boolean writeCommitLog) throws IOException
+    {
+        Table.open(table_).apply(this, this.getSerializedBuffer(), writeCommitLog);
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=888701&r1=888700&r2=888701&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Dec  9 05:02:08 2009
@@ -428,14 +428,15 @@
      * Once this happens the data associated with the individual column families
      * is also written to the column family store's memtable.
     */
-    void apply(RowMutation mutation, DataOutputBuffer serializedMutation) throws IOException
+    void apply(RowMutation mutation, DataOutputBuffer serializedMutation, boolean writeCommitLog) throws IOException
     {
         HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new HashMap<ColumnFamilyStore, Memtable>(2);
 
         flusherLock_.readLock().lock();
         try
         {
-            CommitLog.open().add(mutation, serializedMutation);
+            if (writeCommitLog)
+                CommitLog.open().add(mutation, serializedMutation);
         
             for (ColumnFamily columnFamily : mutation.getColumnFamilies())
             {
@@ -452,17 +453,7 @@
 
         // usually mTF will be empty and this will be a no-op
         for (Map.Entry<ColumnFamilyStore, Memtable> entry : memtablesToFlush.entrySet())
-            entry.getKey().switchMemtable(entry.getValue());
-    }
-
-    void applyNow(RowMutation row) throws IOException
-    {
-        String key = row.key();
-        for (ColumnFamily columnFamily : row.getColumnFamilies())
-        {
-            ColumnFamilyStore cfStore = columnFamilyStores_.get(columnFamily.name());
-            cfStore.applyNow( key, columnFamily );
-        }
+            entry.getKey().switchMemtable(entry.getValue(), writeCommitLog);
     }
 
     public List<Future<?>> flush() throws IOException