You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/19 19:41:12 UTC

cassandra git commit: Move MeteredFlusher to its own thread

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 2acbab651 -> 5f54285e9


Move MeteredFlusher to its own thread

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-8485


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5f54285e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5f54285e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5f54285e

Branch: refs/heads/cassandra-2.0
Commit: 5f54285e9e39833b6bc01317fd74b8bd9b408842
Parents: 2acbab6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Dec 19 21:40:25 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Dec 19 21:40:25 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/MeteredFlusher.java | 31 +++++++++++++++-----
 .../cassandra/service/CassandraDaemon.java      |  2 +-
 3 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd128f5..57ab5b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Move MeteredFlusher to its own thread (CASSANDRA-8485)
  * Fix non-distinct results in DISTNCT queries on static columns when
    paging is enabled (CASSANDRA-8087)
  * Move all hints related tasks to hints internal executor (CASSANDRA-8285)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 4f06bc6..30dbb23 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -21,16 +21,33 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
 public class MeteredFlusher implements Runnable
 {
     private static final Logger logger = LoggerFactory.getLogger(MeteredFlusher.class);
 
+    public static final MeteredFlusher instance = new MeteredFlusher();
+
+    private final ScheduledExecutorService executor;
+
+    private MeteredFlusher()
+    {
+        executor = new DebuggableScheduledThreadPoolExecutor("MeteredFlusher");
+    }
+
+    public void start()
+    {
+        executor.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS);
+    }
+
     public void run()
     {
         long allowedSize = calculateAllowedSize();
@@ -55,7 +72,7 @@ public class MeteredFlusher implements Runnable
             long size = cfs.getTotalMemtableLiveSize();
             if (allowedSize > flushingSize && size > (allowedSize - flushingSize) / maxInFlight)
             {
-                logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
+                logger.info("Flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
                 cfs.forceFlush();
             }
             else
@@ -66,7 +83,7 @@ public class MeteredFlusher implements Runnable
 
         if (liveSize + flushingSize <= allowedSize)
             return;
-        logger.info("estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize);
+        logger.info("Estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize);
 
         Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>()
         {
@@ -89,16 +106,16 @@ public class MeteredFlusher implements Runnable
             long size = cfs.getTotalMemtableLiveSize();
             if (size > 0)
             {
-                logger.info("flushing {} to free up {} bytes", cfs, size);
+                logger.info("Flushing {} to free up {} bytes", cfs, size);
                 liveSize -= size;
                 cfs.forceFlush();
             }
         }
 
-        logger.trace("memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize);
+        logger.trace("Memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize);
     }
 
-    private static List<ColumnFamilyStore> affectedColumnFamilies()
+    private List<ColumnFamilyStore> affectedColumnFamilies()
     {
         List<ColumnFamilyStore> affected = new ArrayList<>();
         // filter out column families that aren't affected by MeteredFlusher
@@ -108,7 +125,7 @@ public class MeteredFlusher implements Runnable
         return affected;
     }
 
-    private static long calculateAllowedSize()
+    private long calculateAllowedSize()
     {
         long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
         // deduct the combined memory limit of the tables unaffected by the metered flusher (we don't flush them, we
@@ -119,7 +136,7 @@ public class MeteredFlusher implements Runnable
         return allowed;
     }
 
-    private static long calculateFlushingSize()
+    private long calculateFlushingSize()
     {
         ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring;
         long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 89d2bb0..cad1658 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -328,7 +328,7 @@ public class CassandraDaemon
 
         // MeteredFlusher can block if flush queue fills up, so don't put on scheduledTasks
         // Start it before commit log, so memtables can flush during commit log replay
-        StorageService.optionalTasks.scheduleWithFixedDelay(new MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS);
+        MeteredFlusher.instance.start();
 
         // replay the log if necessary
         try