You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/12/19 19:41:12 UTC
cassandra git commit: Move MeteredFlusher to its own thread
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 2acbab651 -> 5f54285e9
Move MeteredFlusher to its own thread
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-8485
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5f54285e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5f54285e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5f54285e
Branch: refs/heads/cassandra-2.0
Commit: 5f54285e9e39833b6bc01317fd74b8bd9b408842
Parents: 2acbab6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Dec 19 21:40:25 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Dec 19 21:40:25 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/MeteredFlusher.java | 31 +++++++++++++++-----
.../cassandra/service/CassandraDaemon.java | 2 +-
3 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bd128f5..57ab5b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Move MeteredFlusher to its own thread (CASSANDRA-8485)
* Fix non-distinct results in DISTNCT queries on static columns when
paging is enabled (CASSANDRA-8087)
* Move all hints related tasks to hints internal executor (CASSANDRA-8285)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 4f06bc6..30dbb23 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -21,16 +21,33 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
public class MeteredFlusher implements Runnable
{
private static final Logger logger = LoggerFactory.getLogger(MeteredFlusher.class);
+ public static final MeteredFlusher instance = new MeteredFlusher();
+
+ private final ScheduledExecutorService executor;
+
+ private MeteredFlusher()
+ {
+ executor = new DebuggableScheduledThreadPoolExecutor("MeteredFlusher");
+ }
+
+ public void start()
+ {
+ executor.scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS);
+ }
+
public void run()
{
long allowedSize = calculateAllowedSize();
@@ -55,7 +72,7 @@ public class MeteredFlusher implements Runnable
long size = cfs.getTotalMemtableLiveSize();
if (allowedSize > flushingSize && size > (allowedSize - flushingSize) / maxInFlight)
{
- logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
+ logger.info("Flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
cfs.forceFlush();
}
else
@@ -66,7 +83,7 @@ public class MeteredFlusher implements Runnable
if (liveSize + flushingSize <= allowedSize)
return;
- logger.info("estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize);
+ logger.info("Estimated {} live and {} flushing bytes used by all memtables", liveSize, flushingSize);
Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>()
{
@@ -89,16 +106,16 @@ public class MeteredFlusher implements Runnable
long size = cfs.getTotalMemtableLiveSize();
if (size > 0)
{
- logger.info("flushing {} to free up {} bytes", cfs, size);
+ logger.info("Flushing {} to free up {} bytes", cfs, size);
liveSize -= size;
cfs.forceFlush();
}
}
- logger.trace("memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize);
+ logger.trace("Memtable memory usage is {} bytes with {} live", liveSize + flushingSize, liveSize);
}
- private static List<ColumnFamilyStore> affectedColumnFamilies()
+ private List<ColumnFamilyStore> affectedColumnFamilies()
{
List<ColumnFamilyStore> affected = new ArrayList<>();
// filter out column families that aren't affected by MeteredFlusher
@@ -108,7 +125,7 @@ public class MeteredFlusher implements Runnable
return affected;
}
- private static long calculateAllowedSize()
+ private long calculateAllowedSize()
{
long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
// deduct the combined memory limit of the tables unaffected by the metered flusher (we don't flush them, we
@@ -119,7 +136,7 @@ public class MeteredFlusher implements Runnable
return allowed;
}
- private static long calculateFlushingSize()
+ private long calculateFlushingSize()
{
ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring;
long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5f54285e/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 89d2bb0..cad1658 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -328,7 +328,7 @@ public class CassandraDaemon
// MeteredFlusher can block if flush queue fills up, so don't put on scheduledTasks
// Start it before commit log, so memtables can flush during commit log replay
- StorageService.optionalTasks.scheduleWithFixedDelay(new MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS);
+ MeteredFlusher.instance.start();
// replay the log if necessary
try