You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/24 22:55:32 UTC

svn commit: r758044 - /incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java

Author: alakshman
Date: Tue Mar 24 21:55:31 2009
New Revision: 758044

URL: http://svn.apache.org/viewvc?rev=758044&view=rev
Log:
Fix for CASSANDRA-9 JIRA. 

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=758044&r1=758043&r2=758044&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Tue Mar 24 21:55:31 2009
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Callable;
@@ -53,6 +54,7 @@
 	private static Logger logger_ = Logger.getLogger( Memtable.class );
     private static Map<String, ExecutorService> apartments_ = new HashMap<String, ExecutorService>();
     public static final String flushKey_ = "FlushKey";
+    
     public static void shutdown()
     {
     	Set<String> names = apartments_.keySet();
@@ -156,6 +158,28 @@
             columnFamilies_.put(key_, columnFamily_);
         }
     }
+    
+    /**
+     * Flushes the current memtable to disk.
+     * 
+     * @author alakshman
+     *
+     */
+    class Flusher implements Runnable
+    {
+        private CommitLog.CommitLogContext cLogCtx_;
+        
+        Flusher(CommitLog.CommitLogContext cLogCtx)
+        {
+            cLogCtx_ = cLogCtx;
+        }
+        
+        public void run()
+        {
+            ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+            MemtableManager.instance().submit(cfName_, Memtable.this, cLogCtx_);
+        }
+    }
 
     /**
      * Compares two Memtable based on creation time. 
@@ -235,7 +259,10 @@
                 if (!isFrozen_)
                 {
                     isFrozen_ = true;
-                    MemtableManager.instance().submit(cfStore.getColumnFamilyName(), this, cLogCtx);
+                    /* Submit this Memtable to be flushed. */
+                    Runnable flusher = new Flusher(cLogCtx);
+                    apartments_.get(cfName_).submit(flusher);
+                    // MemtableManager.instance().submit(cfStore.getColumnFamilyName(), this, cLogCtx);
                     cfStore.switchMemtable(key, columnFamily, cLogCtx);
                 }
                 else
@@ -280,8 +307,6 @@
         }
     }
 
-
-
     private void resolve(String key, ColumnFamily columnFamily)
     {
     	ColumnFamily oldCf = columnFamilies_.get(key);
@@ -314,7 +339,6 @@
         	resolve(key, columnFamily);
     }
 
-    
     ColumnFamily getLocalCopy(String key, String cfName, IFilter filter)
     {
     	String[] values = RowMutation.getColumnAndColumnFamily(cfName);
@@ -458,7 +482,7 @@
             if ( columnFamily != null )
             {
                 /* serialize the cf with column indexes */
-                ColumnFamily.serializer2().serialize( columnFamily, buffer );
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );                                
                 /* Now write the key and value to disk */
                 ssTable.append(pKey.key(), pKey.hash(), buffer);
                 bf.fill(pKey.key());
@@ -485,7 +509,7 @@
             if ( columnFamily != null )
             {
                 /* serialize the cf with column indexes */
-                ColumnFamily.serializer2().serialize( columnFamily, buffer );
+                ColumnFamily.serializer2().serialize( columnFamily, buffer );                                              
                 /* Now write the key and value to disk */
                 ssTable.append(key, buffer);
                 bf.fill(key);