You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/11 20:26:43 UTC
[18/21] git commit: make memory metering use an unbounded queue to
avoid blocking the write path patch by pschuller and jbellis;
reviewed by slebresne for CASSANDRA-4032
make memory metering use an unbounded queue to avoid blocking the write path
patch by pschuller and jbellis; reviewed by slebresne for CASSANDRA-4032
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd3bfac6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd3bfac6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd3bfac6
Branch: refs/heads/cassandra-1.1
Commit: fd3bfac6cbc487e36ac1c39740c5897e350d0d16
Parents: d49113f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 10 10:59:35 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Apr 11 13:24:09 2012 -0500
----------------------------------------------------------------------
src/java/org/apache/cassandra/db/Memtable.java | 86 ++++++++++--------
1 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd3bfac6/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 81dac7c..d9e9570 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SlabAllocator;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.github.jamm.MemoryMeter;
public class Memtable
@@ -53,14 +54,17 @@ public class Memtable
// max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
private static final double MAX_SANE_LIVE_RATIO = 64.0;
- // we're careful to only allow one count to run at a time because counting is slow
- // (can be minutes, for a large memtable and a busy server), so we could keep memtables
- // alive after they're flushed and would otherwise be GC'd.
+ // we want to limit the amount of concurrently running and/or queued meterings, because counting is slow (can be
+ // minutes, for a large memtable and a busy server). so we could keep memtables
+ // alive after they're flushed and would otherwise be GC'd. the approach we take is to bound the number of
+ // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
+ // will implicitly be bounded by the number of CFS:s.
+ private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
1,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
- new SynchronousQueue<Runnable>(),
+ new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("MemoryMeter"))
{
@Override
@@ -152,7 +156,7 @@ public class Memtable
resolve(key, columnFamily);
}
- public void updateLiveRatio()
+ public void updateLiveRatio() throws RuntimeException
{
if (!MemoryMeter.isInitialized())
{
@@ -162,50 +166,56 @@ public class Memtable
return;
}
+ if (!meteringInProgress.add(cfs))
+ {
+ logger.debug("Metering already pending or active for {}; skipping liveRatio update", cfs);
+ return;
+ }
+
Runnable runnable = new Runnable()
{
public void run()
{
- activelyMeasuring = Memtable.this;
-
- long start = System.currentTimeMillis();
- // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
- // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = meter.measure(columnFamilies);
- int objects = 0;
- for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
- {
- deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
- objects += entry.getValue().getColumnCount();
- }
- double newRatio = (double) deepSize / currentThroughput.get();
-
- if (newRatio < MIN_SANE_LIVE_RATIO)
+ try
{
- logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
- newRatio = MIN_SANE_LIVE_RATIO;
+ activelyMeasuring = Memtable.this;
+
+ long start = System.currentTimeMillis();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+ long deepSize = meter.measure(columnFamilies);
+ int objects = 0;
+ for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
+ {
+ deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / currentThroughput.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+ cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
+ new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
+ activelyMeasuring = null;
}
- if (newRatio > MAX_SANE_LIVE_RATIO)
+ finally
{
- logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
- newRatio = MAX_SANE_LIVE_RATIO;
+ meteringInProgress.remove(cfs);
}
- cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
-
- logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
- activelyMeasuring = null;
}
};
- try
- {
- meterExecutor.submit(runnable);
- }
- catch (RejectedExecutionException e)
- {
- logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
- }
+ meterExecutor.submit(runnable);
}
private void resolve(DecoratedKey key, ColumnFamily cf)