You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 21:11:15 UTC

svn commit: r774022 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java Memtable.java MemtableManager.java Table.java

Author: jbellis
Date: Tue May 12 19:11:15 2009
New Revision: 774022

URL: http://svn.apache.org/viewvc?rev=774022&view=rev
Log:
fix race condition between when memtable is replaced as the active one and when it's added to
the set of historical (pending flush) memtables.
patch by jbellis; reviewed by Eric Evans for CASSANDRA-161

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=774022&r1=774021&r2=774022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue May 12 19:11:15 2009
@@ -23,17 +23,11 @@
 import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -55,7 +49,10 @@
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.commons.lang.StringUtils;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -63,10 +60,14 @@
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
+    private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+
     private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
     private static final int BUFSIZE = 128 * 1024 * 1024;
     private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
-    private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+
+    private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
+    private static ExecutorService flusher_ = new DebuggableThreadPoolExecutor("MEMTABLE-FLUSHER-POOL");
 
     private final String table_;
     public final String columnFamily_;
@@ -93,10 +94,10 @@
     private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
     private TimedStatsDeque diskReadStats_ = new TimedStatsDeque(60000);
 
-    ColumnFamilyStore(String table, String columnFamily, boolean isSuper, int indexValue) throws IOException
+    ColumnFamilyStore(String table, String columnFamilyName, boolean isSuper, int indexValue) throws IOException
     {
         table_ = table;
-        columnFamily_ = columnFamily;
+        columnFamily_ = columnFamilyName;
         isSuper_ = isSuper;
         fileIndexGenerator_.set(indexValue);
         memtable_ = new AtomicReference<Memtable>(new Memtable(table_, columnFamily_));
@@ -430,6 +431,7 @@
     */
     void switchMemtable()
     {
+        getMemtablesPendingFlushNotNull(columnFamily_).add(memtable_.get()); // it's ok for the MT to briefly be both active and pendingFlush
         memtable_.set(new Memtable(table_, columnFamily_));
 
         if (memtableSwitchCount == Integer.MAX_VALUE)
@@ -462,7 +464,7 @@
         oldMemtable.forceflush();
         // block for flush to finish by adding a no-op action to the flush executorservice
         // and waiting for that to finish.  (this works since flush ES is single-threaded.)
-        Future f = MemtableManager.instance().flusher_.submit(new Runnable()
+        Future f = flusher_.submit(new Runnable()
         {
             public void run()
             {
@@ -532,7 +534,7 @@
         if (columnFamilies.size() == 0 || !filter.isDone())
         {
             /* Check if MemtableManager has any historical information */
-            MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
+            getUnflushedColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies);
         }
         if (columnFamilies.size() == 0 || !filter.isDone())
         {
@@ -591,7 +593,6 @@
         }
     }
 
-
     private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
     {
         SSTable ssTable = new SSTable(ssTableFile, StorageService.getPartitioner());
@@ -1443,6 +1444,64 @@
         return files.size();
     }
 
+    public static List<Memtable> getUnflushedMemtables(String cfName)
+    {
+        return new ArrayList<Memtable>(getMemtablesPendingFlushNotNull(cfName));
+    }
+
+    private static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
+    {
+        Set<Memtable> memtables = memtablesPendingFlush.get(columnFamilyName);
+        if (memtables == null)
+        {
+            memtablesPendingFlush.putIfAbsent(columnFamilyName, new NonBlockingHashSet<Memtable>());
+            memtables = memtablesPendingFlush.get(columnFamilyName); // might not be the object we just put, if there was a race!
+        }
+        return memtables;
+    }
+
+    /*
+     * Retrieve column family from the list of Memtables that have been
+     * submitted for flush but have not yet been flushed.
+     * It also filters out unneccesary columns based on the passed in filter.
+    */
+    void getUnflushedColumnFamily(String key, String cfName, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
+    {
+        List<Memtable> memtables = getUnflushedMemtables(cfName);
+        Collections.sort(memtables);
+        int size = memtables.size();
+        for ( int i = size - 1; i >= 0; --i  )
+        {
+            ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
+            if ( columnFamily != null )
+            {
+                columnFamilies.add(columnFamily);
+                if( filter.isDone())
+                    break;
+            }
+        }
+    }
+
+    /* Submit memtables to be flushed to disk */
+    public static void submitFlush(final Memtable memtable, final CommitLog.CommitLogContext cLogCtx)
+    {
+        flusher_.submit(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    memtable.flush(cLogCtx);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                getMemtablesPendingFlushNotNull(memtable.getColumnFamily()).remove(memtable);
+            }
+        });
+    }
+
     public boolean isSuper()
     {
         return isSuper_;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=774022&r1=774021&r2=774022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue May 12 19:11:15 2009
@@ -393,7 +393,7 @@
             {
                 public void run()
                 {
-                    MemtableManager.instance().submit(cfName_, Memtable.this, cLogCtx);
+                    ColumnFamilyStore.submitFlush(Memtable.this, cLogCtx);
                 }
             };
             flushQueuer = new FutureTask(runnable, null);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=774022&r1=774021&r2=774022&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue May 12 19:11:15 2009
@@ -886,7 +886,7 @@
             // memtable keys: current and historical
             Iterator<Memtable> memtables = (Iterator<Memtable>) IteratorUtils.chainedIterator(
                     IteratorUtils.singletonIterator(cfs.getMemtable()),
-                    MemtableManager.instance().getUnflushedMemtables(cfName).iterator());
+                    ColumnFamilyStore.getUnflushedMemtables(cfName).iterator());
             while (memtables.hasNext())
             {
                 iterators.add(IteratorUtils.filteredIterator(memtables.next().sortedKeyIterator(), new Predicate()