You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/17 19:14:36 UTC

[1/6] git commit: Tuning knobs for dealing with large blobs and many CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan

Updated Branches:
  refs/heads/cassandra-1.2 0804b7610 -> dc95c8c0b
  refs/heads/cassandra-2.0 56b5c6f30 -> 83b0b2fb5
  refs/heads/trunk 62cedd3fe -> 889ca1f50


Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0

Branch: refs/heads/cassandra-1.2
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
  * Allow local batchlog writes for CL.ANY (CASSANDRA-5967)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.  By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue.  If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
 
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
     public int commitlog_segment_size_in_mb = 32;
+    public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
 
     public String endpoint_snitch;
     public Boolean dynamic_snitch = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
         return conf.commitlog_sync_batch_window_in_ms;
     }
 
-    public static int getCommitLogSyncPeriod() {
+    public static int getCommitLogSyncPeriod()
+    {
         return conf.commitlog_sync_period_in_ms;
     }
 
+    public static int getCommitLogPeriodicQueueSize()
+    {
+        return conf.commitlog_periodic_queue_size;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final Directories directories;
 
     /** ratio of in-memory memtable size, to serialized size */
-    volatile double liveRatio = 1.0;
+    volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
     /** ops count last time we computed liveRatio */
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) metric.memtableSwitchCount.count();
     }
 
-    private Memtable getMemtableThreadSafe()
+    Memtable getMemtableThreadSafe()
     {
         return data.getMemtable();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
     // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
     // will implicitly be bounded by the number of CFS:s.
     private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
-    private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
-                                                                                          1,
+    private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                           Integer.MAX_VALUE,
                                                                                           TimeUnit.MILLISECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
-                                                                                          new NamedThreadFactory("MemoryMeter"))
-    {
-        @Override
-        protected void afterExecute(Runnable r, Throwable t)
-        {
-            super.afterExecute(r, t);
-            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
-        }
-    };
-
+                                                                                          new NamedThreadFactory("MemoryMeter"),
+                                                                                          "internal");
     private final MemoryMeter meter;
 
-    volatile static Memtable activelyMeasuring;
+    volatile static ColumnFamilyStore activelyMeasuring;
 
     private final AtomicLong currentSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
         if (!MemoryMeter.isInitialized())
         {
             // hack for openjdk.  we log a warning about this in the startup script too.
-            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0.  Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
-            cfs.liveRatio = 10.0;
+            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}.  "
+                        + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
+                        + " upgrade to the Sun JRE instead", cfs.liveRatio);
             return;
         }
 
@@ -196,56 +187,7 @@ public class Memtable
             return;
         }
 
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    activelyMeasuring = Memtable.this;
-
-                    long start = System.currentTimeMillis();
-                    // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
-                    // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
-                    long deepSize = meter.measure(columnFamilies);
-                    int objects = 0;
-                    for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
-                    {
-                        deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
-                        objects += entry.getValue().getColumnCount();
-                    }
-                    double newRatio = (double) deepSize / currentSize.get();
-
-                    if (newRatio < MIN_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MIN_SANE_LIVE_RATIO;
-                    }
-                    if (newRatio > MAX_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MAX_SANE_LIVE_RATIO;
-                    }
-
-                    // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
-                    // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
-                    if (newRatio > cfs.liveRatio)
-                        cfs.liveRatio = newRatio;
-                    else
-                        cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
-                    logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
-                                cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
-                    activelyMeasuring = null;
-                }
-                finally
-                {
-                    meteringInProgress.remove(cfs);
-                }
-            }
-        };
-
-        meterExecutor.submit(runnable);
+        meterExecutor.submit(new MeteringRunnable(cfs));
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
@@ -520,4 +462,63 @@ public class Memtable
                                      sstableMetadataCollector);
         }
     }
+
+    private static class MeteringRunnable implements Runnable
+    {
+        // we might need to wait in the meter queue for a while.  measure whichever memtable is active at that point,
+        // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+        private final ColumnFamilyStore cfs;
+
+        public MeteringRunnable(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+        }
+
+        public void run()
+        {
+            try
+            {
+                activelyMeasuring = cfs;
+                Memtable memtable = cfs.getMemtableThreadSafe();
+
+                long start = System.currentTimeMillis();
+                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+                long deepSize = memtable.meter.measure(memtable.columnFamilies);
+                int objects = 0;
+                for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+                {
+                    deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+                    objects += entry.getValue().getColumnCount();
+                }
+                double newRatio = (double) deepSize / memtable.currentSize.get();
+
+                if (newRatio < MIN_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+                    newRatio = MIN_SANE_LIVE_RATIO;
+                }
+                if (newRatio > MAX_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+                    newRatio = MAX_SANE_LIVE_RATIO;
+                }
+
+                // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+                // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+                if (newRatio > cfs.liveRatio)
+                    cfs.liveRatio = newRatio;
+                else
+                    cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
+                            cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
+            }
+            finally
+            {
+                activelyMeasuring = null;
+                meteringInProgress.remove(cfs);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
 
     public void run()
     {
+        long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+
         // first, find how much memory non-active memtables are using
-        Memtable activelyMeasuring = Memtable.activelyMeasuring;
-        long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+        long flushingBytes = Memtable.activelyMeasuring == null
+                           ? 0
+                           : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
         flushingBytes += countFlushingBytes();
+        if (flushingBytes > 0)
+            logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
 
         // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
         // of the total size allotted.  Then, flush other CFs in order of size if necessary.
         long liveBytes = 0;
         try
         {
+            long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
                 long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
                                                             + DatabaseDescriptor.getFlushWriters()
                                                             + DatabaseDescriptor.getFlushQueueSize())
                                                   / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
-                if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+                if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight)
                 {
                     logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
                     cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
                 }
             }
 
-            if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+            if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
                 return;
 
-            logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+            logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes);
 
             // sort memtables by size
             List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
             // flush largest first until we get below our threshold.
             // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
             // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
-            while (true)
+            while (!sorted.isEmpty())
             {
                 flushingBytes = countFlushingBytes();
-                if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+                if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
                     break;
 
                 ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
                 long size = cfs.getTotalMemtableLiveSize();
+                if (size == 0)
+                    break;
                 logger.info("flushing {} to free up {} bytes", cfs, size);
                 liveBytes -= size;
                 cfs.forceFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
 
     public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+        queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
                                       threadPoolProxy.getTotalBlockedTasks()));
         }
         // one offs
-        CompactionManager cm = CompactionManager.instance;
         logger.info(String.format("%-25s%10s%10s",
-                                  "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+                                  "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
+        logger.info(String.format("%-25s%10s%10s",
+                                  "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
         int pendingCommands = 0;
         for (int n : MessagingService.instance().getCommandPendingTasks().values())
         {


[6/6] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/889ca1f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/889ca1f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/889ca1f5

Branch: refs/heads/trunk
Commit: 889ca1f504113c90b0ffc6700a1e80635d6cb13e
Parents: 62cedd3 83b0b2f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:14:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:14:18 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 107 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------


[5/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83b0b2fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83b0b2fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83b0b2fb

Branch: refs/heads/cassandra-2.0
Commit: 83b0b2fb5b60e4104b2b691185263e067ada3563
Parents: 56b5c6f dc95c8c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:10:27 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:12:04 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 107 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f48fb5c,fb4f3f4..2192fe9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
 -1.2.10
 +2.0.1
 + * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
 + * Improve leveled compaction's ability to find non-overlapping L0 compactions
 +   to work on concurrently (CASSANDRA-5921)
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
 + * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
 + * Require superuser status for adding triggers (CASSANDRA-5963)
 + * Make standalone scrubber handle old and new style leveled manifest
 +   (CASSANDRA-6005)
 + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
 + * Fix paged ranges with multiple replicas (CASSANDRA-6004)
 + * Fix potential AssertionError during tracing (CASSANDRA-6041)
 + * Fix NPE in sstablesplit (CASSANDRA-6027)
 +Merged from 1.2:
- 1.2.10
+  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
+  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
   * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
   * Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
   * Optimize name query performance in wide rows (CASSANDRA-5966)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 2b3ca1e,cbe20fe..4cca602
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -448,4 -462,63 +390,63 @@@ public class Memtabl
                                       sstableMetadataCollector);
          }
      }
+ 
+     private static class MeteringRunnable implements Runnable
+     {
+         // we might need to wait in the meter queue for a while.  measure whichever memtable is active at that point,
+         // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+         private final ColumnFamilyStore cfs;
+ 
+         public MeteringRunnable(ColumnFamilyStore cfs)
+         {
+             this.cfs = cfs;
+         }
+ 
+         public void run()
+         {
+             try
+             {
+                 activelyMeasuring = cfs;
+                 Memtable memtable = cfs.getMemtableThreadSafe();
+ 
 -                long start = System.currentTimeMillis();
++                long start = System.nanoTime();
+                 // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                 // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
 -                long deepSize = memtable.meter.measure(memtable.columnFamilies);
++                long deepSize = memtable.meter.measure(memtable.rows);
+                 int objects = 0;
 -                for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
++                for (Map.Entry<RowPosition, AtomicSortedColumns> entry : memtable.rows.entrySet())
+                 {
+                     deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+                     objects += entry.getValue().getColumnCount();
+                 }
+                 double newRatio = (double) deepSize / memtable.currentSize.get();
+ 
+                 if (newRatio < MIN_SANE_LIVE_RATIO)
+                 {
+                     logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+                     newRatio = MIN_SANE_LIVE_RATIO;
+                 }
+                 if (newRatio > MAX_SANE_LIVE_RATIO)
+                 {
+                     logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+                     newRatio = MAX_SANE_LIVE_RATIO;
+                 }
+ 
+                 // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+                 // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+                 if (newRatio > cfs.liveRatio)
+                     cfs.liveRatio = newRatio;
+                 else
+                     cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+ 
+                 logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
 -                            cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
++                            cfs, cfs.liveRatio, newRatio, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), objects);
+             }
+             finally
+             {
+                 activelyMeasuring = null;
+                 meteringInProgress.remove(cfs);
+             }
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------


[4/6] git commit: merge from 1.2

Posted by jb...@apache.org.
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83b0b2fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83b0b2fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83b0b2fb

Branch: refs/heads/trunk
Commit: 83b0b2fb5b60e4104b2b691185263e067ada3563
Parents: 56b5c6f dc95c8c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:10:27 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:12:04 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 107 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f48fb5c,fb4f3f4..2192fe9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
 -1.2.10
 +2.0.1
 + * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
 + * Improve leveled compaction's ability to find non-overlapping L0 compactions
 +   to work on concurrently (CASSANDRA-5921)
 + * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
 + * Log Merkle tree stats (CASSANDRA-2698)
 + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
 + * Improve offheap memcpy performance (CASSANDRA-5884)
 + * Use a range aware scanner for cleanup (CASSANDRA-2524)
 + * Cleanup doesn't need to inspect sstables that contain only local data 
 +   (CASSANDRA-5722)
 + * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
 + * Improve native protocol serialization (CASSANDRA-5664)
 + * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
 + * Require superuser status for adding triggers (CASSANDRA-5963)
 + * Make standalone scrubber handle old and new style leveled manifest
 +   (CASSANDRA-6005)
 + * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
 + * Fix paged ranges with multiple replicas (CASSANDRA-6004)
 + * Fix potential AssertionError during tracing (CASSANDRA-6041)
 + * Fix NPE in sstablesplit (CASSANDRA-6027)
 +Merged from 1.2:
- 1.2.10
+  * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
+  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
   * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
   * Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
   * Optimize name query performance in wide rows (CASSANDRA-5966)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 2b3ca1e,cbe20fe..4cca602
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -448,4 -462,63 +390,63 @@@ public class Memtabl
                                       sstableMetadataCollector);
          }
      }
+ 
+     private static class MeteringRunnable implements Runnable
+     {
+         // we might need to wait in the meter queue for a while.  measure whichever memtable is active at that point,
+         // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+         private final ColumnFamilyStore cfs;
+ 
+         public MeteringRunnable(ColumnFamilyStore cfs)
+         {
+             this.cfs = cfs;
+         }
+ 
+         public void run()
+         {
+             try
+             {
+                 activelyMeasuring = cfs;
+                 Memtable memtable = cfs.getMemtableThreadSafe();
+ 
 -                long start = System.currentTimeMillis();
++                long start = System.nanoTime();
+                 // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                 // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
 -                long deepSize = memtable.meter.measure(memtable.columnFamilies);
++                long deepSize = memtable.meter.measure(memtable.rows);
+                 int objects = 0;
 -                for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
++                for (Map.Entry<RowPosition, AtomicSortedColumns> entry : memtable.rows.entrySet())
+                 {
+                     deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+                     objects += entry.getValue().getColumnCount();
+                 }
+                 double newRatio = (double) deepSize / memtable.currentSize.get();
+ 
+                 if (newRatio < MIN_SANE_LIVE_RATIO)
+                 {
+                     logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+                     newRatio = MIN_SANE_LIVE_RATIO;
+                 }
+                 if (newRatio > MAX_SANE_LIVE_RATIO)
+                 {
+                     logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+                     newRatio = MAX_SANE_LIVE_RATIO;
+                 }
+ 
+                 // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+                 // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+                 if (newRatio > cfs.liveRatio)
+                     cfs.liveRatio = newRatio;
+                 else
+                     cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+ 
+                 logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
 -                            cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
++                            cfs, cfs.liveRatio, newRatio, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), objects);
+             }
+             finally
+             {
+                 activelyMeasuring = null;
+                 meteringInProgress.remove(cfs);
+             }
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------


[3/6] git commit: Tuning knobs for dealing with large blobs and many CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan

Posted by jb...@apache.org.
Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0

Branch: refs/heads/trunk
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
  * Allow local batchlog writes for CL.ANY (CASSANDRA-5967)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.  By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue.  If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
 
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
     public int commitlog_segment_size_in_mb = 32;
+    public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
 
     public String endpoint_snitch;
     public Boolean dynamic_snitch = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
         return conf.commitlog_sync_batch_window_in_ms;
     }
 
-    public static int getCommitLogSyncPeriod() {
+    public static int getCommitLogSyncPeriod()
+    {
         return conf.commitlog_sync_period_in_ms;
     }
 
+    public static int getCommitLogPeriodicQueueSize()
+    {
+        return conf.commitlog_periodic_queue_size;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final Directories directories;
 
     /** ratio of in-memory memtable size, to serialized size */
-    volatile double liveRatio = 1.0;
+    volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
     /** ops count last time we computed liveRatio */
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) metric.memtableSwitchCount.count();
     }
 
-    private Memtable getMemtableThreadSafe()
+    Memtable getMemtableThreadSafe()
     {
         return data.getMemtable();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
     // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
     // will implicitly be bounded by the number of CFS:s.
     private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
-    private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
-                                                                                          1,
+    private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                           Integer.MAX_VALUE,
                                                                                           TimeUnit.MILLISECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
-                                                                                          new NamedThreadFactory("MemoryMeter"))
-    {
-        @Override
-        protected void afterExecute(Runnable r, Throwable t)
-        {
-            super.afterExecute(r, t);
-            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
-        }
-    };
-
+                                                                                          new NamedThreadFactory("MemoryMeter"),
+                                                                                          "internal");
     private final MemoryMeter meter;
 
-    volatile static Memtable activelyMeasuring;
+    volatile static ColumnFamilyStore activelyMeasuring;
 
     private final AtomicLong currentSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
         if (!MemoryMeter.isInitialized())
         {
             // hack for openjdk.  we log a warning about this in the startup script too.
-            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0.  Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
-            cfs.liveRatio = 10.0;
+            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}.  "
+                        + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
+                        + " upgrade to the Sun JRE instead", cfs.liveRatio);
             return;
         }
 
@@ -196,56 +187,7 @@ public class Memtable
             return;
         }
 
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    activelyMeasuring = Memtable.this;
-
-                    long start = System.currentTimeMillis();
-                    // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
-                    // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
-                    long deepSize = meter.measure(columnFamilies);
-                    int objects = 0;
-                    for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
-                    {
-                        deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
-                        objects += entry.getValue().getColumnCount();
-                    }
-                    double newRatio = (double) deepSize / currentSize.get();
-
-                    if (newRatio < MIN_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MIN_SANE_LIVE_RATIO;
-                    }
-                    if (newRatio > MAX_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MAX_SANE_LIVE_RATIO;
-                    }
-
-                    // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
-                    // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
-                    if (newRatio > cfs.liveRatio)
-                        cfs.liveRatio = newRatio;
-                    else
-                        cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
-                    logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
-                                cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
-                    activelyMeasuring = null;
-                }
-                finally
-                {
-                    meteringInProgress.remove(cfs);
-                }
-            }
-        };
-
-        meterExecutor.submit(runnable);
+        meterExecutor.submit(new MeteringRunnable(cfs));
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
@@ -520,4 +462,63 @@ public class Memtable
                                      sstableMetadataCollector);
         }
     }
+
+    private static class MeteringRunnable implements Runnable
+    {
+        // we might need to wait in the meter queue for a while.  measure whichever memtable is active at that point,
+        // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+        private final ColumnFamilyStore cfs;
+
+        public MeteringRunnable(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+        }
+
+        public void run()
+        {
+            try
+            {
+                activelyMeasuring = cfs;
+                Memtable memtable = cfs.getMemtableThreadSafe();
+
+                long start = System.currentTimeMillis();
+                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+                long deepSize = memtable.meter.measure(memtable.columnFamilies);
+                int objects = 0;
+                for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+                {
+                    deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+                    objects += entry.getValue().getColumnCount();
+                }
+                double newRatio = (double) deepSize / memtable.currentSize.get();
+
+                if (newRatio < MIN_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+                    newRatio = MIN_SANE_LIVE_RATIO;
+                }
+                if (newRatio > MAX_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+                    newRatio = MAX_SANE_LIVE_RATIO;
+                }
+
+                // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+                // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+                if (newRatio > cfs.liveRatio)
+                    cfs.liveRatio = newRatio;
+                else
+                    cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
+                            cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
+            }
+            finally
+            {
+                activelyMeasuring = null;
+                meteringInProgress.remove(cfs);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
 
     public void run()
     {
+        long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+
         // first, find how much memory non-active memtables are using
-        Memtable activelyMeasuring = Memtable.activelyMeasuring;
-        long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+        long flushingBytes = Memtable.activelyMeasuring == null
+                           ? 0
+                           : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
         flushingBytes += countFlushingBytes();
+        if (flushingBytes > 0)
+            logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
 
         // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
         // of the total size allotted.  Then, flush other CFs in order of size if necessary.
         long liveBytes = 0;
         try
         {
+            long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
                 long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
                                                             + DatabaseDescriptor.getFlushWriters()
                                                             + DatabaseDescriptor.getFlushQueueSize())
                                                   / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
-                if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+                if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight)
                 {
                     logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
                     cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
                 }
             }
 
-            if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+            if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
                 return;
 
-            logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+            logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes);
 
             // sort memtables by size
             List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
             // flush largest first until we get below our threshold.
             // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
             // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
-            while (true)
+            while (!sorted.isEmpty())
             {
                 flushingBytes = countFlushingBytes();
-                if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+                if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
                     break;
 
                 ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
                 long size = cfs.getTotalMemtableLiveSize();
+                if (size == 0)
+                    break;
                 logger.info("flushing {} to free up {} bytes", cfs, size);
                 liveBytes -= size;
                 cfs.forceFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
 
     public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+        queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
                                       threadPoolProxy.getTotalBlockedTasks()));
         }
         // one offs
-        CompactionManager cm = CompactionManager.instance;
         logger.info(String.format("%-25s%10s%10s",
-                                  "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+                                  "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
+        logger.info(String.format("%-25s%10s%10s",
+                                  "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
         int pendingCommands = 0;
         for (int n : MessagingService.instance().getCommandPendingTasks().values())
         {


[2/6] git commit: Tuning knobs for dealing with large blobs and many CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan

Posted by jb...@apache.org.
Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0

Branch: refs/heads/cassandra-2.0
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
  * Allow local batchlog writes for CL.ANY (CASSANDRA-5967)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.  By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue.  If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
 
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
     public int commitlog_segment_size_in_mb = 32;
+    public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
 
     public String endpoint_snitch;
     public Boolean dynamic_snitch = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
         return conf.commitlog_sync_batch_window_in_ms;
     }
 
-    public static int getCommitLogSyncPeriod() {
+    public static int getCommitLogSyncPeriod()
+    {
         return conf.commitlog_sync_period_in_ms;
     }
 
+    public static int getCommitLogPeriodicQueueSize()
+    {
+        return conf.commitlog_periodic_queue_size;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final Directories directories;
 
     /** ratio of in-memory memtable size, to serialized size */
-    volatile double liveRatio = 1.0;
+    volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
     /** ops count last time we computed liveRatio */
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) metric.memtableSwitchCount.count();
     }
 
-    private Memtable getMemtableThreadSafe()
+    Memtable getMemtableThreadSafe()
     {
         return data.getMemtable();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
     // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
     // will implicitly be bounded by the number of CFS:s.
     private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
-    private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
-                                                                                          1,
+    private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                           Integer.MAX_VALUE,
                                                                                           TimeUnit.MILLISECONDS,
                                                                                           new LinkedBlockingQueue<Runnable>(),
-                                                                                          new NamedThreadFactory("MemoryMeter"))
-    {
-        @Override
-        protected void afterExecute(Runnable r, Throwable t)
-        {
-            super.afterExecute(r, t);
-            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
-        }
-    };
-
+                                                                                          new NamedThreadFactory("MemoryMeter"),
+                                                                                          "internal");
     private final MemoryMeter meter;
 
-    volatile static Memtable activelyMeasuring;
+    volatile static ColumnFamilyStore activelyMeasuring;
 
     private final AtomicLong currentSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
         if (!MemoryMeter.isInitialized())
         {
             // hack for openjdk.  we log a warning about this in the startup script too.
-            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0.  Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
-            cfs.liveRatio = 10.0;
+            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}.  "
+                        + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
+                        + " upgrade to the Sun JRE instead", cfs.liveRatio);
             return;
         }
 
@@ -196,56 +187,7 @@ public class Memtable
             return;
         }
 
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    activelyMeasuring = Memtable.this;
-
-                    long start = System.currentTimeMillis();
-                    // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
-                    // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
-                    long deepSize = meter.measure(columnFamilies);
-                    int objects = 0;
-                    for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
-                    {
-                        deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
-                        objects += entry.getValue().getColumnCount();
-                    }
-                    double newRatio = (double) deepSize / currentSize.get();
-
-                    if (newRatio < MIN_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MIN_SANE_LIVE_RATIO;
-                    }
-                    if (newRatio > MAX_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MAX_SANE_LIVE_RATIO;
-                    }
-
-                    // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
-                    // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
-                    if (newRatio > cfs.liveRatio)
-                        cfs.liveRatio = newRatio;
-                    else
-                        cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
-                    logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
-                                cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
-                    activelyMeasuring = null;
-                }
-                finally
-                {
-                    meteringInProgress.remove(cfs);
-                }
-            }
-        };
-
-        meterExecutor.submit(runnable);
+        meterExecutor.submit(new MeteringRunnable(cfs));
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
@@ -520,4 +462,63 @@ public class Memtable
                                      sstableMetadataCollector);
         }
     }
+
+    private static class MeteringRunnable implements Runnable
+    {
+        // we might need to wait in the meter queue for a while.  measure whichever memtable is active at that point,
+        // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+        private final ColumnFamilyStore cfs;
+
+        public MeteringRunnable(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+        }
+
+        public void run()
+        {
+            try
+            {
+                activelyMeasuring = cfs;
+                Memtable memtable = cfs.getMemtableThreadSafe();
+
+                long start = System.currentTimeMillis();
+                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+                long deepSize = memtable.meter.measure(memtable.columnFamilies);
+                int objects = 0;
+                for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+                {
+                    deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+                    objects += entry.getValue().getColumnCount();
+                }
+                double newRatio = (double) deepSize / memtable.currentSize.get();
+
+                if (newRatio < MIN_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+                    newRatio = MIN_SANE_LIVE_RATIO;
+                }
+                if (newRatio > MAX_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+                    newRatio = MAX_SANE_LIVE_RATIO;
+                }
+
+                // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+                // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+                if (newRatio > cfs.liveRatio)
+                    cfs.liveRatio = newRatio;
+                else
+                    cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
+                            cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
+            }
+            finally
+            {
+                activelyMeasuring = null;
+                meteringInProgress.remove(cfs);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
 
     public void run()
     {
+        long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+
         // first, find how much memory non-active memtables are using
-        Memtable activelyMeasuring = Memtable.activelyMeasuring;
-        long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+        long flushingBytes = Memtable.activelyMeasuring == null
+                           ? 0
+                           : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
         flushingBytes += countFlushingBytes();
+        if (flushingBytes > 0)
+            logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
 
         // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
         // of the total size allotted.  Then, flush other CFs in order of size if necessary.
         long liveBytes = 0;
         try
         {
+            long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
                 long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
                                                             + DatabaseDescriptor.getFlushWriters()
                                                             + DatabaseDescriptor.getFlushQueueSize())
                                                   / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
-                if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+                if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight)
                 {
                     logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
                     cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
                 }
             }
 
-            if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+            if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
                 return;
 
-            logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+            logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes);
 
             // sort memtables by size
             List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
             // flush largest first until we get below our threshold.
             // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
             // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
-            while (true)
+            while (!sorted.isEmpty())
             {
                 flushingBytes = countFlushingBytes();
-                if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+                if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
                     break;
 
                 ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
                 long size = cfs.getTotalMemtableLiveSize();
+                if (size == 0)
+                    break;
                 logger.info("flushing {} to free up {} bytes", cfs, size);
                 liveBytes -= size;
                 cfs.forceFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
 
     public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+        queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
                                       threadPoolProxy.getTotalBlockedTasks()));
         }
         // one offs
-        CompactionManager cm = CompactionManager.instance;
         logger.info(String.format("%-25s%10s%10s",
-                                  "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+                                  "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
+        logger.info(String.format("%-25s%10s%10s",
+                                  "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
         int pendingCommands = 0;
         for (int n : MessagingService.instance().getCommandPendingTasks().values())
         {