You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/11/11 19:10:56 UTC

svn commit: r834995 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamilyStore.java db/CommitLog.java db/Memtable.java db/Table.java io/Streaming.java

Author: jbellis
Date: Wed Nov 11 18:10:55 2009
New Revision: 834995

URL: http://svn.apache.org/viewvc?rev=834995&view=rev
Log:
parallelize post-recovery flushes.
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-539

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Nov 11 18:10:55 2009
@@ -1004,11 +1004,6 @@
         return isSuper_;
     }
 
-    public void flushMemtableOnRecovery() throws IOException
-    {
-        getMemtableThreadSafe().flushOnRecovery();
-    }
-
     public int getMemtableColumnsCount()
     {
         return getMemtableThreadSafe().getCurrentObjectCount();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Wed Nov 11 18:10:55 2009
@@ -32,6 +32,7 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -283,8 +284,9 @@
 
     void recover(File[] clogs) throws IOException
     {
-        DataInputBuffer bufIn = new DataInputBuffer();
+        Set<Table> tablesRecovered = new HashSet<Table>();
 
+        DataInputBuffer bufIn = new DataInputBuffer();
         for (File file : clogs)
         {
             int bufferSize = (int)Math.min(file.length(), 32 * 1024 * 1024);
@@ -299,8 +301,6 @@
             if (logger_.isDebugEnabled())
                 logger_.debug("Replaying " + file + " starting at " + lowPos);
 
-            Set<Table> tablesRecovered = new HashSet<Table>();
-
             /* read the logs populate RowMutation and apply */
             while (!reader.isEOF())
             {
@@ -348,10 +348,24 @@
                 }
             }
             reader.close();
-            /* apply the rows read -- success will result in the CL file being discarded */
-            for (Table table : tablesRecovered)
+        }
+
+        // flush replayed tables, allowing commitlog segments to be removed
+        List<Future<?>> futures = new ArrayList<Future<?>>();
+        for (Table table : tablesRecovered)
+        {
+            futures.addAll(table.flush());
+        }
+        // wait for flushes to finish before continuing with startup
+        for (Future<?> future : futures)
+        {
+            try
+            {
+                future.get();
+            }
+            catch (Exception e)
             {
-                table.flush(true);
+                throw new RuntimeException(e);
             }
         }
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Nov 11 18:10:55 2009
@@ -166,15 +166,6 @@
         }
     }
 
-    /** flush synchronously (in the current thread, not on the executors).
-     *  only the recover code should call this. */
-    void flushOnRecovery() throws IOException {
-        if (!isClean())
-        {
-            writeSortedContents(getSortedKeys());
-        }
-    }
-
     // for debugging
     public String contents()
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Nov 11 18:10:55 2009
@@ -24,6 +24,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.Future;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
@@ -487,19 +488,16 @@
         }
     }
 
-    public void flush(boolean fRecovery) throws IOException
+    public List<Future<?>> flush() throws IOException
     {
+        List<Future<?>> futures = new ArrayList<Future<?>>();
         for (String cfName : columnFamilyStores_.keySet())
         {
-            if (fRecovery)
-            {
-                columnFamilyStores_.get(cfName).flushMemtableOnRecovery();
-            }
-            else
-            {
-                columnFamilyStores_.get(cfName).forceFlush();
-            }
+            Future<?> future = columnFamilyStores_.get(cfName).forceFlush();
+            if (future != null)
+                futures.add(future);
         }
+        return futures;
     }
 
     // for binary load path.  skips commitlog.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java?rev=834995&r1=834994&r2=834995&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Streaming.java Wed Nov 11 18:10:55 2009
@@ -52,7 +52,7 @@
                 Table table = Table.open(tName);
                 if (logger.isDebugEnabled())
                   logger.debug("Flushing memtables ...");
-                table.flush(false);
+                table.flush();
                 if (logger.isDebugEnabled())
                   logger.debug("Performing anticompaction ...");
                 /* Get the list of files that need to be streamed */