You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/04/11 20:26:43 UTC

[18/21] git commit: make memory metering use an unbounded queue to avoid blocking the write path patch by pschuller and jbellis; reviewed by slebresne for CASSANDRA-4032

make memory metering use an unbounded queue to avoid blocking the write path
patch by pschuller and jbellis; reviewed by slebresne for CASSANDRA-4032


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd3bfac6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd3bfac6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd3bfac6

Branch: refs/heads/cassandra-1.1
Commit: fd3bfac6cbc487e36ac1c39740c5897e350d0d16
Parents: d49113f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Apr 10 10:59:35 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Wed Apr 11 13:24:09 2012 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Memtable.java |   86 ++++++++++--------
 1 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd3bfac6/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 81dac7c..d9e9570 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SlabAllocator;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.github.jamm.MemoryMeter;
 
 public class Memtable
@@ -53,14 +54,17 @@ public class Memtable
     // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
     private static final double MAX_SANE_LIVE_RATIO = 64.0;
 
-    // we're careful to only allow one count to run at a time because counting is slow
-    // (can be minutes, for a large memtable and a busy server), so we could keep memtables
-    // alive after they're flushed and would otherwise be GC'd.
+    // we want to limit the amount of concurrently running and/or queued meterings, because counting is slow (can be
+    // minutes, for a large memtable and a busy server). so we could keep memtables
+    // alive after they're flushed and would otherwise be GC'd. the approach we take is to bound the number of
+    // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
+    // will implicitly be bounded by the number of CFS:s.
+    private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
     private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
                                                                                           1,
                                                                                           Integer.MAX_VALUE,
                                                                                           TimeUnit.MILLISECONDS,
-                                                                                          new SynchronousQueue<Runnable>(),
+                                                                                          new LinkedBlockingQueue<Runnable>(),
                                                                                           new NamedThreadFactory("MemoryMeter"))
     {
         @Override
@@ -152,7 +156,7 @@ public class Memtable
         resolve(key, columnFamily);
     }
 
-    public void updateLiveRatio()
+    public void updateLiveRatio() throws RuntimeException
     {
         if (!MemoryMeter.isInitialized())
         {
@@ -162,50 +166,56 @@ public class Memtable
             return;
         }
 
+        if (!meteringInProgress.add(cfs))
+        {
+            logger.debug("Metering already pending or active for {}; skipping liveRatio update", cfs);
+            return;
+        }
+
         Runnable runnable = new Runnable()
         {
             public void run()
             {
-                activelyMeasuring = Memtable.this;
-
-                long start = System.currentTimeMillis();
-                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
-                // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
-                long deepSize = meter.measure(columnFamilies);
-                int objects = 0;
-                for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
-                {
-                    deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
-                    objects += entry.getValue().getColumnCount();
-                }
-                double newRatio = (double) deepSize / currentThroughput.get();
-
-                if (newRatio < MIN_SANE_LIVE_RATIO)
+                try
                 {
-                    logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
-                    newRatio = MIN_SANE_LIVE_RATIO;
+                    activelyMeasuring = Memtable.this;
+
+                    long start = System.currentTimeMillis();
+                    // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                    // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+                    long deepSize = meter.measure(columnFamilies);
+                    int objects = 0;
+                    for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
+                    {
+                        deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
+                        objects += entry.getValue().getColumnCount();
+                    }
+                    double newRatio = (double) deepSize / currentThroughput.get();
+
+                    if (newRatio < MIN_SANE_LIVE_RATIO)
+                    {
+                        logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
+                        newRatio = MIN_SANE_LIVE_RATIO;
+                    }
+                    if (newRatio > MAX_SANE_LIVE_RATIO)
+                    {
+                        logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
+                        newRatio = MAX_SANE_LIVE_RATIO;
+                    }
+                    cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
+
+                    logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
+                                new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
+                    activelyMeasuring = null;
                 }
-                if (newRatio > MAX_SANE_LIVE_RATIO)
+                finally
                 {
-                    logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
-                    newRatio = MAX_SANE_LIVE_RATIO;
+                    meteringInProgress.remove(cfs);
                 }
-                cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
-
-                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
-                            new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
-                activelyMeasuring = null;
             }
         };
 
-        try
-        {
-            meterExecutor.submit(runnable);
-        }
-        catch (RejectedExecutionException e)
-        {
-            logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
-        }
+        meterExecutor.submit(runnable);
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf)