You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/29 17:05:07 UTC

svn commit: r798935 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java Memtable.java

Author: jbellis
Date: Wed Jul 29 15:05:07 2009
New Revision: 798935

URL: http://svn.apache.org/viewvc?rev=798935&view=rev
Log:
combine switchMemtable and enqueueFlush so that they always operate on the same object.  move forceFlush guts from MT to CFS since it encapsulates better there
patch by jbellis; reviewed by Brandon Williams for CASSANDRA-320

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=798935&r1=798934&r2=798935&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Jul 29 15:05:07 2009
@@ -394,13 +394,19 @@
                              columnFamily_, SSTable.TEMPFILE_MARKER, index);
     }
 
-    void switchMemtable()
+    void switchMemtable(Memtable oldMemtable, CommitLog.CommitLogContext ctx)
     {
         memtableLock_.writeLock().lock();
         try
         {
+            if (oldMemtable.isFrozen())
+            {
+                return;
+            }
             logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable");
-            getMemtablesPendingFlushNotNull(columnFamily_).add(memtable_); // it's ok for the MT to briefly be both active and pendingFlush
+            oldMemtable.freeze();
+            getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable); // it's ok for the MT to briefly be both active and pendingFlush
+            submitFlush(oldMemtable, ctx);
             memtable_ = new Memtable(table_, columnFamily_);
         }
         finally
@@ -423,13 +429,25 @@
 
     public void forceFlush()
     {
-        memtable_.forceflush();
+        if (memtable_.isClean())
+            return;
+
+        CommitLog.CommitLogContext ctx = null;
+        try
+        {
+            ctx = CommitLog.open().getContext();
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        switchMemtable(memtable_, ctx);
     }
 
     void forceBlockingFlush() throws IOException, ExecutionException, InterruptedException
     {
         Memtable oldMemtable = getMemtableThreadSafe();
-        oldMemtable.forceflush();
+        forceFlush();
         // block for flush to finish by adding a no-op action to the flush executorservice
         // and waiting for that to finish.  (this works since flush ES is single-threaded.)
         Future f = flusher_.submit(new Runnable()
@@ -463,8 +481,7 @@
         Memtable initialMemtable = getMemtableThreadSafe();
         if (initialMemtable.isThresholdViolated())
         {
-            switchMemtable();
-            initialMemtable.enqueueFlush(cLogCtx);
+            switchMemtable(initialMemtable, cLogCtx);
         }
         memtableLock_.writeLock().lock();
         try
@@ -1284,7 +1301,7 @@
     }
 
     /* Submit memtables to be flushed to disk */
-    public static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
+    private static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
     {
         logger_.info("Enqueuing flush of " + memtable);
         flusher_.submit(new Runnable()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=798935&r1=798934&r2=798935&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Jul 29 15:05:07 2009
@@ -121,13 +121,14 @@
     	return cfName_;
     }
 
-    synchronized void enqueueFlush(CommitLog.CommitLogContext cLogCtx)
+    boolean isFrozen()
     {
-        if (!isFrozen_)
-        {
-            isFrozen_ = true;
-            ColumnFamilyStore.submitFlush(this, cLogCtx);
-        }
+        return isFrozen_;
+    }
+
+    void freeze()
+    {
+        isFrozen_ = true;
     }
 
     /**
@@ -142,27 +143,6 @@
         resolve(key, columnFamily);
     }
 
-    /*
-     * This version is used to switch memtable and force flush.
-     * Flushing is still done in a separate executor -- forceFlush only blocks
-     * until the flush runnable is queued.
-    */
-    public void forceflush()
-    {
-        if (isClean())
-            return;
-
-        try
-        {
-            Table.open(table_).getColumnFamilyStore(cfName_).switchMemtable();
-            enqueueFlush(CommitLog.open().getContext());
-        }
-        catch (IOException ex)
-        {
-            throw new RuntimeException(ex);
-        }
-    }
-
     /** flush synchronously (in the current thread, not on the executor).
      *  only the recover code should call this. */
     void flushOnRecovery() throws IOException {