You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/23 03:57:46 UTC

svn commit: r817929 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/BinaryMemtable.java db/ColumnFamilyStore.java db/ColumnFamilyStoreMBean.java db/IFlushable.java db/Memtable.java db/Table.java utils/SimpleCondition.java

Author: jbellis
Date: Wed Sep 23 01:57:45 2009
New Revision: 817929

URL: http://svn.apache.org/viewvc?rev=817929&view=rev
Log:
split flusher executor into flushSorter and flushWriter.  This is because sorting is CPU-bound, and writing is disk-bound; we want to be able to do both at once.
patch by jbellis; reviewed by goffinet for CASSANDRA-401

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Wed Sep 23 01:57:45 2009
@@ -29,6 +29,7 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
@@ -36,7 +37,7 @@
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.dht.IPartitioner;
 
-public class BinaryMemtable
+public class BinaryMemtable implements IFlushable
 {
     private static Logger logger_ = Logger.getLogger( Memtable.class );
     private int threshold_ = DatabaseDescriptor.getMemtableSize()*1024*1024;
@@ -128,40 +129,30 @@
         currentSize_.addAndGet(buffer.length + key.length());
     }
 
-
-    /*
-     * 
-    */
-    void flush() throws IOException
+    public ColumnFamilyStore.SortedFlushable getSortedContents()
     {
-        if (columnFamilies_.size() == 0)
-            return;
-
-        /*
-         * Use the SSTable to write the contents of the TreeMap
-         * to disk.
-        */
+        assert !columnFamilies_.isEmpty();
+        logger_.info("Sorting " + this);
+        List<DecoratedKey> keys = new ArrayList<DecoratedKey>(columnFamilies_.keySet());
+        Collections.sort(keys, partitioner_.getDecoratedKeyObjComparator());
+        return new ColumnFamilyStore.SortedFlushable(keys, this);
+    }
 
-        String path;
-        SSTableWriter writer;
+    public SSTableReader writeSortedContents(ColumnFamilyStore.SortedFlushable sortedFlushable) throws IOException
+    {
+        logger_.info("Writing " + this);
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-        List<DecoratedKey> keys = new ArrayList<DecoratedKey>( columnFamilies_.keySet() );
-        path = cfStore.getTempSSTablePath();
-        writer = new SSTableWriter(path, keys.size(), StorageService.getPartitioner());
-
-        Collections.sort(keys, partitioner_.getDecoratedKeyObjComparator());
+        String path = cfStore.getTempSSTablePath();
+        SSTableWriter writer = new SSTableWriter(path, sortedFlushable.keys.size(), StorageService.getPartitioner());
 
-        /* Use this BloomFilter to decide if a key exists in a SSTable */
-        for (DecoratedKey key : keys)
+        for (DecoratedKey key : (List<DecoratedKey>) sortedFlushable.keys)
         {
             byte[] bytes = columnFamilies_.get(key);
-            if (bytes.length > 0)
-            {
-                /* Now write the key and value to disk */
-                writer.append(key.toString(), bytes);
-            }
+            assert bytes.length > 0;
+            writer.append(key.toString(), bytes);
         }
-        cfStore.addSSTable(writer.closeAndOpenReader());
-        columnFamilies_.clear();       
+        SSTableReader sstable = writer.closeAndOpenReader();
+        logger_.info("Completed flushing " + writer.getFilename());
+        return sstable;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 23 01:57:45 2009
@@ -26,6 +26,7 @@
 import javax.management.ObjectName;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -53,13 +54,36 @@
 {
     private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
 
+    /*
+     * submitFlush first puts [Binary]Memtable.getSortedContents on the flushSorter executor,
+     * which then puts the sorted results on the writer executor.  This is because sorting is CPU-bound,
+     * and writing is disk-bound; we want to be able to do both at once.  When the write is complete,
+     * we turn the writer into an SSTableReader and add it to ssTables_ where it is available for reads.
+     *
+     * For BinaryMemtable that's about all that happens.  For live Memtables there are two other things
+     * that switchMemtable does (which should be the only caller of submitFlush in this case).
+     * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
+     * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to commitLogUpdater
+     * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple flushes
+     * to happen simultaneously on multicore systems, while still calling onMF in the correct order,
+     * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
+     * called, all data up to the given context has been persisted to SSTables.
+     */
     private static NonBlockingHashMap<String, Set<Memtable>> memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
-    private static DebuggableThreadPoolExecutor flusher_ = new DebuggableThreadPoolExecutor(1,
-                                                                                            Runtime.getRuntime().availableProcessors(),
-                                                                                            Integer.MAX_VALUE,
-                                                                                            TimeUnit.SECONDS,
-                                                                                            new LinkedBlockingQueue<Runnable>(2 * Runtime.getRuntime().availableProcessors()),
-                                                                                            new NamedThreadFactory("MEMTABLE-FLUSHER-POOL"));
+    private static ExecutorService flushSorter_
+            = new DebuggableThreadPoolExecutor(1,
+                                               Runtime.getRuntime().availableProcessors(),
+                                               Integer.MAX_VALUE,
+                                               TimeUnit.SECONDS,
+                                               new LinkedBlockingQueue<Runnable>(2 * Runtime.getRuntime().availableProcessors()),
+                                               new NamedThreadFactory("FLUSH-SORTER-POOL"));
+    private static ExecutorService flushWriter_
+            = new DebuggableThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
+                                               DatabaseDescriptor.getAllDataFileLocations().length,
+                                               Integer.MAX_VALUE,
+                                               TimeUnit.SECONDS,
+                                               new LinkedBlockingQueue<Runnable>(),
+                                               new NamedThreadFactory("FLUSH-WRITER-POOL"));
     private static ExecutorService commitLogUpdater_ = new DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
 
     private final String table_;
@@ -328,8 +352,6 @@
          *  If we can get the writelock, that means no new updates can come in and 
          *  all ongoing updates to memtables have completed. We can get the tail
          *  of the log and use it as the starting position for log replay on recovery.
-         *  
-         *  By holding the flusherLock_, we don't need the memetableLock any more.
          */
         Table.flusherLock_.writeLock().lock();
         try
@@ -342,8 +364,7 @@
             }
             logger_.info(columnFamily_ + " has reached its threshold; switching in a fresh Memtable");
             oldMemtable.freeze();
-            getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable); // it's ok for the MT to briefly be both active and pendingFlush
-            final Future<?> future = submitFlush(oldMemtable);
+            final Condition condition = submitFlush(oldMemtable);
             memtable_ = new Memtable(table_, columnFamily_);
             // a second executor that makes sure the onMemtableFlushes get called in the right order,
             // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
@@ -353,7 +374,7 @@
                 {
                     try
                     {
-                        future.get();
+                        condition.await();
                         onMemtableFlush(ctx);
                     }
                     catch (Exception e)
@@ -876,7 +897,7 @@
         return new ArrayList<Memtable>(getMemtablesPendingFlushNotNull(cfName));
     }
 
-    private static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
+    static Set<Memtable> getMemtablesPendingFlushNotNull(String columnFamilyName)
     {
         Set<Memtable> memtables = memtablesPendingFlush.get(columnFamilyName);
         if (memtables == null)
@@ -887,44 +908,45 @@
         return memtables;
     }
 
-    /* Submit memtables to be flushed to disk */
-    private static Future<?> submitFlush(final Memtable memtable)
+    Condition submitFlush(final IFlushable flushable)
     {
-        logger_.info("Enqueuing flush of " + memtable);
-        return flusher_.submit(new Runnable()
+        logger_.info("Enqueuing flush of " + flushable);
+        if (flushable instanceof Memtable)
         {
-            public void run()
-            {
-                try
-                {
-                    memtable.flush();
-                }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                getMemtablesPendingFlushNotNull(memtable.getColumnFamily()).remove(memtable);
-            }
-        });
-    }
-
-    static void submitFlush(final BinaryMemtable binaryMemtable)
-    {
-        logger_.info("Enqueuing flush of " + binaryMemtable);
-        flusher_.submit(new Runnable()
+            // special-casing Memtable here is a bit messy, but it's best to keep the flush-related happenings in one place
+            // since they're a little complicated.  (We dont' want to move the remove back to switchMemtable, which is
+            // the other sane option, since that could mean keeping a flushed memtable in the Historical set unnecessarily
+            // while earlier flushes finish.)
+            getMemtablesPendingFlushNotNull(columnFamily_).add((Memtable) flushable); // it's ok for the MT to briefly be both active and pendingFlush
+        }
+        final Condition condition = new SimpleCondition();
+        flushSorter_.submit(new Runnable()
         {
             public void run()
             {
-                try
+                final SortedFlushable sortedFlushable = flushable.getSortedContents();
+                flushWriter_.submit(new Runnable()
                 {
-                    binaryMemtable.flush();
-                }
-                catch (IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                    public void run()
+                    {
+                        try
+                        {
+                            addSSTable(flushable.writeSortedContents(sortedFlushable));
+                        }
+                        catch (IOException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+                        if (flushable instanceof Memtable)
+                        {
+                            getMemtablesPendingFlushNotNull(columnFamily_).remove(flushable);
+                        }
+                        condition.signalAll();
+                    }
+                });
             }
         });
+        return condition;
     }
 
     public boolean isSuper()
@@ -1288,4 +1310,16 @@
     {
         memtable_.clearUnsafe();
     }
+
+    public static class SortedFlushable
+    {
+        public final List<?> keys;
+        public final IFlushable flushable;
+
+        public SortedFlushable(List<?> keys, IFlushable flushable)
+        {
+            this.keys = keys;
+            this.flushable = flushable;
+        }
+    }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Wed Sep 23 01:57:45 2009
@@ -57,7 +57,7 @@
     /**
      * Triggers an immediate memtable flush.
      */
-    public Future<?> forceFlush() throws IOException;
+    public Object forceFlush() throws IOException;
 
     /**
      * @return the number of read operations on this column family in the last minute

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=817929&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java Wed Sep 23 01:57:45 2009
@@ -0,0 +1,11 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+
+import org.apache.cassandra.io.SSTableReader;
+
+public interface IFlushable
+{
+    public ColumnFamilyStore.SortedFlushable getSortedContents();
+    public SSTableReader writeSortedContents(ColumnFamilyStore.SortedFlushable sortedFlushable) throws IOException;
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Sep 23 01:57:45 2009
@@ -20,8 +20,6 @@
 
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -39,7 +37,7 @@
 
 import org.apache.log4j.Logger;
 
-public class Memtable implements Comparable<Memtable>
+public class Memtable implements Comparable<Memtable>, IFlushable
 {
 	private static Logger logger_ = Logger.getLogger( Memtable.class );
 
@@ -167,11 +165,13 @@
         }
     }
 
-    /** flush synchronously (in the current thread, not on the executor).
+    /** flush synchronously (in the current thread, not on the executors).
      *  only the recover code should call this. */
     void flushOnRecovery() throws IOException {
         if (!isClean())
-            flush();
+        {
+            writeSortedContents(getSortedContents());
+        }
     }
 
     // for debugging
@@ -187,15 +187,11 @@
         return builder.toString();
     }
 
-    void flush() throws IOException
+    public ColumnFamilyStore.SortedFlushable getSortedContents()
     {
-        logger_.info("Flushing " + this);
-        ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
-
-        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
-
+        logger_.info("Sorting " + this);
         // sort keys in the order they would be in when decorated
-        final IPartitioner partitioner = StorageService.getPartitioner();
+        final IPartitioner<?> partitioner = StorageService.getPartitioner();
         final Comparator<String> dc = partitioner.getDecoratedKeyComparator();
         ArrayList<String> orderedKeys = new ArrayList<String>(columnFamilies_.keySet());
         Collections.sort(orderedKeys, new Comparator<String>()
@@ -205,8 +201,18 @@
                 return dc.compare(partitioner.decorateKey(o1), partitioner.decorateKey(o2));
             }
         });
+        return new ColumnFamilyStore.SortedFlushable(orderedKeys, this);
+    }
+
+    public SSTableReader writeSortedContents(ColumnFamilyStore.SortedFlushable sortedFlushable) throws IOException
+    {
+        logger_.info("Writing " + this);
+        IPartitioner<?> partitioner = StorageService.getPartitioner();
+        ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
+        SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
+
         DataOutputBuffer buffer = new DataOutputBuffer();
-        for (String key : orderedKeys)
+        for (String key : (List<String>) sortedFlushable.keys)
         {
             buffer.reset();
             ColumnFamily columnFamily = columnFamilies_.get(key);
@@ -218,11 +224,11 @@
                 writer.append(partitioner.decorateKey(key), buffer);
             }
         }
-        SSTableReader ssTable = writer.closeAndOpenReader();
-        cfStore.addSSTable(ssTable);
         buffer.close();
+        SSTableReader ssTable = writer.closeAndOpenReader();
         isFlushed_ = true;
-        logger_.info("Flushed " + ssTable.getFilename());
+        logger_.info("Completed flushing " + ssTable.getFilename());
+        return ssTable;
     }
 
     public String toString()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=817929&r1=817928&r2=817929&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 23 01:57:45 2009
@@ -163,6 +163,8 @@
     /**
      * This is the callback handler that is invoked when we have
      * completely been bootstrapped for a single file by a remote host.
+     *
+     * TODO if we move this into CFS we could make addSSTables private, improving encapsulation.
     */
     public static class BootstrapCompletionHandler implements IStreamComplete
     {                

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java?rev=817929&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/SimpleCondition.java Wed Sep 23 01:57:45 2009
@@ -0,0 +1,51 @@
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+import java.util.Date;
+
+// fulfils the Condition interface without spurious wakeup problems
+// (or lost notify problems either: that is, even if you call await()
+// _after_ signal(), it will work as desired.)
+public class SimpleCondition implements Condition
+{
+    volatile boolean set;
+
+    public synchronized void await() throws InterruptedException
+    {
+        while (!set)
+            wait();
+    }
+
+    public synchronized void signal()
+    {
+        set = true;
+        notify();
+    }
+
+    public synchronized void signalAll()
+    {
+        set = true;
+        notifyAll();
+    }
+
+    public void awaitUninterruptibly()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public long awaitNanos(long nanosTimeout) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean await(long time, TimeUnit unit) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean awaitUntil(Date deadline) throws InterruptedException
+    {
+        throw new UnsupportedOperationException();
+    }
+}