You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/01/14 23:30:00 UTC

[2/3] git commit: move flushWriter into Memtable now that there's only one such service (used to have different executors for memtables/binarymemtables)

move flushWriter into Memtable now that there's only one such service (used to have different executors for memtables/binarymemtables)


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc0f2472
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc0f2472
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc0f2472

Branch: refs/heads/trunk
Commit: fc0f2472d874db1cdbf8f49da61fa70b08da17f8
Parents: 1fac06a
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Jan 14 16:29:09 2013 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Jan 14 16:29:09 2013 -0600

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   22 +-----------
 src/java/org/apache/cassandra/db/Memtable.java     |   27 +++++++++++++-
 2 files changed, 26 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc0f2472/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index b01545a..b832b98 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -80,26 +80,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    /*
-     * switchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
-     * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
-     *
-     * There are two other things that switchMemtable does.
-     * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
-     * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to commitLogUpdater
-     * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple flushes
-     * to happen simultaneously on multicore systems, while still calling onMF in the correct order,
-     * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
-     * called, all data up to the given context has been persisted to SSTables.
-     */
-    private static final ExecutorService flushWriter
-            = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                               StageManager.KEEPALIVE,
-                                               TimeUnit.SECONDS,
-                                               new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
-                                               new NamedThreadFactory("FlushWriter"),
-                                               "internal");
-
     public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
 
     static
@@ -658,7 +638,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 else
                 {
                     logger.info("Enqueuing flush of {}", memtable);
-                    memtable.flushAndSignal(latch, flushWriter, ctx);
+                    memtable.flushAndSignal(latch, ctx);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc0f2472/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 56e2bf5..990ad84 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -26,6 +26,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
@@ -51,6 +54,26 @@ public class Memtable
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
+    /*
+     * switchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
+     * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
+     *
+     * There are two other things that switchMemtable does.
+     * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
+     * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to commitLogUpdater
+     * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple flushes
+     * to happen simultaneously on multicore systems, while still calling onMF in the correct order,
+     * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
+     * called, all data up to the given context has been persisted to SSTables.
+     */
+    private static final ExecutorService flushWriter
+            = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+                                               StageManager.KEEPALIVE,
+                                               TimeUnit.SECONDS,
+                                               new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
+                                               new NamedThreadFactory("FlushWriter"),
+                                               "internal");
+
     // size in memory can never be less than serialized size
     private static final double MIN_SANE_LIVE_RATIO = 1.0;
     // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
@@ -252,9 +275,9 @@ public class Memtable
         return builder.toString();
     }
 
-    public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final Future<ReplayPosition> context)
+    public void flushAndSignal(final CountDownLatch latch, final Future<ReplayPosition> context)
     {
-        writer.execute(new FlushRunnable(latch, context));
+        flushWriter.execute(new FlushRunnable(latch, context));
     }
 
     public String toString()