You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/09/17 19:14:36 UTC
[1/6] git commit: Tuning knobs for dealing with large blobs and many
CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan
Updated Branches:
refs/heads/cassandra-1.2 0804b7610 -> dc95c8c0b
refs/heads/cassandra-2.0 56b5c6f30 -> 83b0b2fb5
refs/heads/trunk 62cedd3fe -> 889ca1f50
Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0
Branch: refs/heads/cassandra-1.2
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 6 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++---------
.../org/apache/cassandra/db/MeteredFlusher.java | 22 ++-
.../PeriodicCommitLogExecutorService.java | 2 +-
.../apache/cassandra/utils/StatusLogger.java | 9 +-
9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
* (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
* Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds. By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue. If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
public int commitlog_segment_size_in_mb = 32;
+ public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
public String endpoint_snitch;
public Boolean dynamic_snitch = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
return conf.commitlog_sync_batch_window_in_ms;
}
- public static int getCommitLogSyncPeriod() {
+ public static int getCommitLogSyncPeriod()
+ {
return conf.commitlog_sync_period_in_ms;
}
+ public static int getCommitLogPeriodicQueueSize()
+ {
+ return conf.commitlog_periodic_queue_size;
+ }
+
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final Directories directories;
/** ratio of in-memory memtable size, to serialized size */
- volatile double liveRatio = 1.0;
+ volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
/** ops count last time we computed liveRatio */
private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return (int) metric.memtableSwitchCount.count();
}
- private Memtable getMemtableThreadSafe()
+ Memtable getMemtableThreadSafe()
{
return data.getMemtable();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
// outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
// will implicitly be bounded by the number of CFS:s.
private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
- private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
- 1,
+ private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemoryMeter"))
- {
- @Override
- protected void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r, t);
- DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
- }
- };
-
+ new NamedThreadFactory("MemoryMeter"),
+ "internal");
private final MemoryMeter meter;
- volatile static Memtable activelyMeasuring;
+ volatile static ColumnFamilyStore activelyMeasuring;
private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
if (!MemoryMeter.isInitialized())
{
// hack for openjdk. we log a warning about this in the startup script too.
- logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0. Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
- cfs.liveRatio = 10.0;
+ logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}. "
+ + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
+ + " upgrade to the Sun JRE instead", cfs.liveRatio);
return;
}
@@ -196,56 +187,7 @@ public class Memtable
return;
}
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- try
- {
- activelyMeasuring = Memtable.this;
-
- long start = System.currentTimeMillis();
- // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
- // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = meter.measure(columnFamilies);
- int objects = 0;
- for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
- {
- deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
- objects += entry.getValue().getColumnCount();
- }
- double newRatio = (double) deepSize / currentSize.get();
-
- if (newRatio < MIN_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
- newRatio = MIN_SANE_LIVE_RATIO;
- }
- if (newRatio > MAX_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
- newRatio = MAX_SANE_LIVE_RATIO;
- }
-
- // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
- // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
- if (newRatio > cfs.liveRatio)
- cfs.liveRatio = newRatio;
- else
- cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
- logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
- activelyMeasuring = null;
- }
- finally
- {
- meteringInProgress.remove(cfs);
- }
- }
- };
-
- meterExecutor.submit(runnable);
+ meterExecutor.submit(new MeteringRunnable(cfs));
}
private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
@@ -520,4 +462,63 @@ public class Memtable
sstableMetadataCollector);
}
}
+
+ private static class MeteringRunnable implements Runnable
+ {
+ // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point,
+ // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+ private final ColumnFamilyStore cfs;
+
+ public MeteringRunnable(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ public void run()
+ {
+ try
+ {
+ activelyMeasuring = cfs;
+ Memtable memtable = cfs.getMemtableThreadSafe();
+
+ long start = System.currentTimeMillis();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+ long deepSize = memtable.meter.measure(memtable.columnFamilies);
+ int objects = 0;
+ for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+ {
+ deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / memtable.currentSize.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+
+ // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+ // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+ if (newRatio > cfs.liveRatio)
+ cfs.liveRatio = newRatio;
+ else
+ cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
+ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
+ }
+ finally
+ {
+ activelyMeasuring = null;
+ meteringInProgress.remove(cfs);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
public void run()
{
+ long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+
// first, find how much memory non-active memtables are using
- Memtable activelyMeasuring = Memtable.activelyMeasuring;
- long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+ long flushingBytes = Memtable.activelyMeasuring == null
+ ? 0
+ : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
flushingBytes += countFlushingBytes();
+ if (flushingBytes > 0)
+ logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
// next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
// of the total size allotted. Then, flush other CFs in order of size if necessary.
long liveBytes = 0;
try
{
+ long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
+ DatabaseDescriptor.getFlushWriters()
+ DatabaseDescriptor.getFlushQueueSize())
/ (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
- if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+ if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight)
{
logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
}
}
- if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+ if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
return;
- logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+ logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes);
// sort memtables by size
List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
// flush largest first until we get below our threshold.
// although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
// while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
- while (true)
+ while (!sorted.isEmpty())
{
flushingBytes = countFlushingBytes();
- if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+ if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
break;
ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
long size = cfs.getTotalMemtableLiveSize();
+ if (size == 0)
+ break;
logger.info("flushing {} to free up {} bytes", cfs, size);
liveBytes -= size;
cfs.forceFlush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
public PeriodicCommitLogExecutorService(final CommitLog commitLog)
{
- queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+ queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
import java.lang.management.ManagementFactory;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
threadPoolProxy.getTotalBlockedTasks()));
}
// one offs
- CompactionManager cm = CompactionManager.instance;
logger.info(String.format("%-25s%10s%10s",
- "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+ "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
+ logger.info(String.format("%-25s%10s%10s",
+ "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
int pendingCommands = 0;
for (int n : MessagingService.instance().getCommandPendingTasks().values())
{
[6/6] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by jb...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/889ca1f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/889ca1f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/889ca1f5
Branch: refs/heads/trunk
Commit: 889ca1f504113c90b0ffc6700a1e80635d6cb13e
Parents: 62cedd3 83b0b2f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:14:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:14:18 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
conf/cassandra.yaml | 6 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++---------
.../org/apache/cassandra/db/MeteredFlusher.java | 22 ++-
.../PeriodicCommitLogExecutorService.java | 2 +-
.../apache/cassandra/utils/StatusLogger.java | 9 +-
9 files changed, 107 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/889ca1f5/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
[5/6] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83b0b2fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83b0b2fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83b0b2fb
Branch: refs/heads/cassandra-2.0
Commit: 83b0b2fb5b60e4104b2b691185263e067ada3563
Parents: 56b5c6f dc95c8c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:10:27 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:12:04 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
conf/cassandra.yaml | 6 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++---------
.../org/apache/cassandra/db/MeteredFlusher.java | 22 ++-
.../PeriodicCommitLogExecutorService.java | 2 +-
.../apache/cassandra/utils/StatusLogger.java | 9 +-
9 files changed, 107 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f48fb5c,fb4f3f4..2192fe9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
-1.2.10
+2.0.1
+ * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
+ * Improve leveled compaction's ability to find non-overlapping L0 compactions
+ to work on concurrently (CASSANDRA-5921)
+ * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
+ * Log Merkle tree stats (CASSANDRA-2698)
+ * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
+ * Improve offheap memcpy performance (CASSANDRA-5884)
+ * Use a range aware scanner for cleanup (CASSANDRA-2524)
+ * Cleanup doesn't need to inspect sstables that contain only local data
+ (CASSANDRA-5722)
+ * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
+ * Improve native protocol serialization (CASSANDRA-5664)
+ * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
+ * Require superuser status for adding triggers (CASSANDRA-5963)
+ * Make standalone scrubber handle old and new style leveled manifest
+ (CASSANDRA-6005)
+ * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
+ * Fix paged ranges with multiple replicas (CASSANDRA-6004)
+ * Fix potential AssertionError during tracing (CASSANDRA-6041)
+ * Fix NPE in sstablesplit (CASSANDRA-6027)
+Merged from 1.2:
- 1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
+ * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
* Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
* Optimize name query performance in wide rows (CASSANDRA-5966)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 2b3ca1e,cbe20fe..4cca602
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -448,4 -462,63 +390,63 @@@ public class Memtabl
sstableMetadataCollector);
}
}
+
+ private static class MeteringRunnable implements Runnable
+ {
+ // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point,
+ // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+ private final ColumnFamilyStore cfs;
+
+ public MeteringRunnable(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ public void run()
+ {
+ try
+ {
+ activelyMeasuring = cfs;
+ Memtable memtable = cfs.getMemtableThreadSafe();
+
- long start = System.currentTimeMillis();
++ long start = System.nanoTime();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = memtable.meter.measure(memtable.columnFamilies);
++ long deepSize = memtable.meter.measure(memtable.rows);
+ int objects = 0;
- for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
++ for (Map.Entry<RowPosition, AtomicSortedColumns> entry : memtable.rows.entrySet())
+ {
+ deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / memtable.currentSize.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+
+ // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+ // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+ if (newRatio > cfs.liveRatio)
+ cfs.liveRatio = newRatio;
+ else
+ cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
++ cfs, cfs.liveRatio, newRatio, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), objects);
+ }
+ finally
+ {
+ activelyMeasuring = null;
+ meteringInProgress.remove(cfs);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
[4/6] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/83b0b2fb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/83b0b2fb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/83b0b2fb
Branch: refs/heads/trunk
Commit: 83b0b2fb5b60e4104b2b691185263e067ada3563
Parents: 56b5c6f dc95c8c
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:10:27 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:12:04 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 3 +-
conf/cassandra.yaml | 6 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++---------
.../org/apache/cassandra/db/MeteredFlusher.java | 22 ++-
.../PeriodicCommitLogExecutorService.java | 2 +-
.../apache/cassandra/utils/StatusLogger.java | 9 +-
9 files changed, 107 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f48fb5c,fb4f3f4..2192fe9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,6 +1,27 @@@
-1.2.10
+2.0.1
+ * Improve error message when yaml contains invalid properties (CASSANDRA-5958)
+ * Improve leveled compaction's ability to find non-overlapping L0 compactions
+ to work on concurrently (CASSANDRA-5921)
+ * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
+ * Log Merkle tree stats (CASSANDRA-2698)
+ * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862)
+ * Improve offheap memcpy performance (CASSANDRA-5884)
+ * Use a range aware scanner for cleanup (CASSANDRA-2524)
+ * Cleanup doesn't need to inspect sstables that contain only local data
+ (CASSANDRA-5722)
+ * Add ability for CQL3 to list partition keys (CASSANDRA-4536)
+ * Improve native protocol serialization (CASSANDRA-5664)
+ * Upgrade Thrift to 0.9.1 (CASSANDRA-5923)
+ * Require superuser status for adding triggers (CASSANDRA-5963)
+ * Make standalone scrubber handle old and new style leveled manifest
+ (CASSANDRA-6005)
+ * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
+ * Fix paged ranges with multiple replicas (CASSANDRA-6004)
+ * Fix potential AssertionError during tracing (CASSANDRA-6041)
+ * Fix NPE in sstablesplit (CASSANDRA-6027)
+Merged from 1.2:
- 1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
+ * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
* Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
* Optimize name query performance in wide rows (CASSANDRA-5966)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/conf/cassandra.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 2b3ca1e,cbe20fe..4cca602
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -448,4 -462,63 +390,63 @@@ public class Memtabl
sstableMetadataCollector);
}
}
+
+ private static class MeteringRunnable implements Runnable
+ {
+ // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point,
+ // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+ private final ColumnFamilyStore cfs;
+
+ public MeteringRunnable(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ public void run()
+ {
+ try
+ {
+ activelyMeasuring = cfs;
+ Memtable memtable = cfs.getMemtableThreadSafe();
+
- long start = System.currentTimeMillis();
++ long start = System.nanoTime();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = memtable.meter.measure(memtable.columnFamilies);
++ long deepSize = memtable.meter.measure(memtable.rows);
+ int objects = 0;
- for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
++ for (Map.Entry<RowPosition, AtomicSortedColumns> entry : memtable.rows.entrySet())
+ {
+ deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / memtable.currentSize.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+
+ // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+ // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+ if (newRatio > cfs.liveRatio)
+ cfs.liveRatio = newRatio;
+ else
+ cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
++ cfs, cfs.liveRatio, newRatio, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), objects);
+ }
+ finally
+ {
+ activelyMeasuring = null;
+ meteringInProgress.remove(cfs);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/83b0b2fb/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
[3/6] git commit: Tuning knobs for dealing with large blobs and many
CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan
Posted by jb...@apache.org.
Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0
Branch: refs/heads/trunk
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 6 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++---------
.../org/apache/cassandra/db/MeteredFlusher.java | 22 ++-
.../PeriodicCommitLogExecutorService.java | 2 +-
.../apache/cassandra/utils/StatusLogger.java | 9 +-
9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
* (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
* Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds. By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue. If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
public int commitlog_segment_size_in_mb = 32;
+ public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
public String endpoint_snitch;
public Boolean dynamic_snitch = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
return conf.commitlog_sync_batch_window_in_ms;
}
- public static int getCommitLogSyncPeriod() {
+ public static int getCommitLogSyncPeriod()
+ {
return conf.commitlog_sync_period_in_ms;
}
+ public static int getCommitLogPeriodicQueueSize()
+ {
+ return conf.commitlog_periodic_queue_size;
+ }
+
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final Directories directories;
/** ratio of in-memory memtable size, to serialized size */
- volatile double liveRatio = 1.0;
+ volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
/** ops count last time we computed liveRatio */
private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return (int) metric.memtableSwitchCount.count();
}
- private Memtable getMemtableThreadSafe()
+ Memtable getMemtableThreadSafe()
{
return data.getMemtable();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
// outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
// will implicitly be bounded by the number of CFS:s.
private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
- private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
- 1,
+ private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemoryMeter"))
- {
- @Override
- protected void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r, t);
- DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
- }
- };
-
+ new NamedThreadFactory("MemoryMeter"),
+ "internal");
private final MemoryMeter meter;
- volatile static Memtable activelyMeasuring;
+ volatile static ColumnFamilyStore activelyMeasuring;
private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
if (!MemoryMeter.isInitialized())
{
// hack for openjdk. we log a warning about this in the startup script too.
- logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0. Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
- cfs.liveRatio = 10.0;
+ logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}. "
+ + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
+ + " upgrade to the Sun JRE instead", cfs.liveRatio);
return;
}
@@ -196,56 +187,7 @@ public class Memtable
return;
}
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- try
- {
- activelyMeasuring = Memtable.this;
-
- long start = System.currentTimeMillis();
- // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
- // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = meter.measure(columnFamilies);
- int objects = 0;
- for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
- {
- deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
- objects += entry.getValue().getColumnCount();
- }
- double newRatio = (double) deepSize / currentSize.get();
-
- if (newRatio < MIN_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
- newRatio = MIN_SANE_LIVE_RATIO;
- }
- if (newRatio > MAX_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
- newRatio = MAX_SANE_LIVE_RATIO;
- }
-
- // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
- // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
- if (newRatio > cfs.liveRatio)
- cfs.liveRatio = newRatio;
- else
- cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
- logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
- activelyMeasuring = null;
- }
- finally
- {
- meteringInProgress.remove(cfs);
- }
- }
- };
-
- meterExecutor.submit(runnable);
+ meterExecutor.submit(new MeteringRunnable(cfs));
}
private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
@@ -520,4 +462,63 @@ public class Memtable
sstableMetadataCollector);
}
}
+
+ private static class MeteringRunnable implements Runnable
+ {
+ // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point,
+ // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+ private final ColumnFamilyStore cfs;
+
+ public MeteringRunnable(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ public void run()
+ {
+ try
+ {
+ activelyMeasuring = cfs;
+ Memtable memtable = cfs.getMemtableThreadSafe();
+
+ long start = System.currentTimeMillis();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+ long deepSize = memtable.meter.measure(memtable.columnFamilies);
+ int objects = 0;
+ for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+ {
+ deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / memtable.currentSize.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+
+ // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+ // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+ if (newRatio > cfs.liveRatio)
+ cfs.liveRatio = newRatio;
+ else
+ cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
+ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
+ }
+ finally
+ {
+ activelyMeasuring = null;
+ meteringInProgress.remove(cfs);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
public void run()
{
+ long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+
// first, find how much memory non-active memtables are using
- Memtable activelyMeasuring = Memtable.activelyMeasuring;
- long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+ long flushingBytes = Memtable.activelyMeasuring == null
+ ? 0
+ : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
flushingBytes += countFlushingBytes();
+ if (flushingBytes > 0)
+ logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
// next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
// of the total size allotted. Then, flush other CFs in order of size if necessary.
long liveBytes = 0;
try
{
+ long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
+ DatabaseDescriptor.getFlushWriters()
+ DatabaseDescriptor.getFlushQueueSize())
/ (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
- if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+ if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight)
{
logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
}
}
- if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+ if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
return;
- logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+ logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes);
// sort memtables by size
List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
// flush largest first until we get below our threshold.
// although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
// while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
- while (true)
+ while (!sorted.isEmpty())
{
flushingBytes = countFlushingBytes();
- if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+ if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
break;
ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
long size = cfs.getTotalMemtableLiveSize();
+ if (size == 0)
+ break;
logger.info("flushing {} to free up {} bytes", cfs, size);
liveBytes -= size;
cfs.forceFlush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
public PeriodicCommitLogExecutorService(final CommitLog commitLog)
{
- queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+ queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
import java.lang.management.ManagementFactory;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
threadPoolProxy.getTotalBlockedTasks()));
}
// one offs
- CompactionManager cm = CompactionManager.instance;
logger.info(String.format("%-25s%10s%10s",
- "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+ "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
+ logger.info(String.format("%-25s%10s%10s",
+ "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
int pendingCommands = 0;
for (int n : MessagingService.instance().getCommandPendingTasks().values())
{
[2/6] git commit: Tuning knobs for dealing with large blobs and many
CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan
Posted by jb...@apache.org.
Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0
Branch: refs/heads/cassandra-2.0
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 6 +-
.../org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 8 +-
.../apache/cassandra/db/ColumnFamilyStore.java | 4 +-
src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++---------
.../org/apache/cassandra/db/MeteredFlusher.java | 22 ++-
.../PeriodicCommitLogExecutorService.java | 2 +-
.../apache/cassandra/utils/StatusLogger.java | 9 +-
9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
* (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
* Allow local batchlog writes for CL.ANY (CASSANDRA-5967)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds. By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue. If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
public Double commitlog_sync_batch_window_in_ms;
public Integer commitlog_sync_period_in_ms;
public int commitlog_segment_size_in_mb = 32;
+ public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
public String endpoint_snitch;
public Boolean dynamic_snitch = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
return conf.commitlog_sync_batch_window_in_ms;
}
- public static int getCommitLogSyncPeriod() {
+ public static int getCommitLogSyncPeriod()
+ {
return conf.commitlog_sync_period_in_ms;
}
+ public static int getCommitLogPeriodicQueueSize()
+ {
+ return conf.commitlog_periodic_queue_size;
+ }
+
public static Config.CommitLogSync getCommitLogSync()
{
return conf.commitlog_sync;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public final Directories directories;
/** ratio of in-memory memtable size, to serialized size */
- volatile double liveRatio = 1.0;
+ volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
/** ops count last time we computed liveRatio */
private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
return (int) metric.memtableSwitchCount.count();
}
- private Memtable getMemtableThreadSafe()
+ Memtable getMemtableThreadSafe()
{
return data.getMemtable();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
// outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but
// will implicitly be bounded by the number of CFS:s.
private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
- private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
- 1,
+ private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
Integer.MAX_VALUE,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemoryMeter"))
- {
- @Override
- protected void afterExecute(Runnable r, Throwable t)
- {
- super.afterExecute(r, t);
- DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
- }
- };
-
+ new NamedThreadFactory("MemoryMeter"),
+ "internal");
private final MemoryMeter meter;
- volatile static Memtable activelyMeasuring;
+ volatile static ColumnFamilyStore activelyMeasuring;
private final AtomicLong currentSize = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
if (!MemoryMeter.isInitialized())
{
// hack for openjdk. we log a warning about this in the startup script too.
- logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0. Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead");
- cfs.liveRatio = 10.0;
+ logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}. "
+ + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
+ + " upgrade to the Sun JRE instead", cfs.liveRatio);
return;
}
@@ -196,56 +187,7 @@ public class Memtable
return;
}
- Runnable runnable = new Runnable()
- {
- public void run()
- {
- try
- {
- activelyMeasuring = Memtable.this;
-
- long start = System.currentTimeMillis();
- // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
- // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
- long deepSize = meter.measure(columnFamilies);
- int objects = 0;
- for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
- {
- deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
- objects += entry.getValue().getColumnCount();
- }
- double newRatio = (double) deepSize / currentSize.get();
-
- if (newRatio < MIN_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
- newRatio = MIN_SANE_LIVE_RATIO;
- }
- if (newRatio > MAX_SANE_LIVE_RATIO)
- {
- logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
- newRatio = MAX_SANE_LIVE_RATIO;
- }
-
- // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
- // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
- if (newRatio > cfs.liveRatio)
- cfs.liveRatio = newRatio;
- else
- cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
- logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
- cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
- activelyMeasuring = null;
- }
- finally
- {
- meteringInProgress.remove(cfs);
- }
- }
- };
-
- meterExecutor.submit(runnable);
+ meterExecutor.submit(new MeteringRunnable(cfs));
}
private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
@@ -520,4 +462,63 @@ public class Memtable
sstableMetadataCollector);
}
}
+
+ private static class MeteringRunnable implements Runnable
+ {
+ // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point,
+ // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
+ private final ColumnFamilyStore cfs;
+
+ public MeteringRunnable(ColumnFamilyStore cfs)
+ {
+ this.cfs = cfs;
+ }
+
+ public void run()
+ {
+ try
+ {
+ activelyMeasuring = cfs;
+ Memtable memtable = cfs.getMemtableThreadSafe();
+
+ long start = System.currentTimeMillis();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+ long deepSize = memtable.meter.measure(memtable.columnFamilies);
+ int objects = 0;
+ for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+ {
+ deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / memtable.currentSize.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+
+ // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
+ // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old
+ if (newRatio > cfs.liveRatio)
+ cfs.liveRatio = newRatio;
+ else
+ cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
+ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects);
+ }
+ finally
+ {
+ activelyMeasuring = null;
+ meteringInProgress.remove(cfs);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
public void run()
{
+ long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+
// first, find how much memory non-active memtables are using
- Memtable activelyMeasuring = Memtable.activelyMeasuring;
- long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+ long flushingBytes = Memtable.activelyMeasuring == null
+ ? 0
+ : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
flushingBytes += countFlushingBytes();
+ if (flushingBytes > 0)
+ logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
// next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
// of the total size allotted. Then, flush other CFs in order of size if necessary.
long liveBytes = 0;
try
{
+ long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
+ DatabaseDescriptor.getFlushWriters()
+ DatabaseDescriptor.getFlushQueueSize())
/ (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
- if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+ if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight)
{
logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size);
cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
}
}
- if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+ if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
return;
- logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+ logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes);
// sort memtables by size
List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
// flush largest first until we get below our threshold.
// although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
// while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
- while (true)
+ while (!sorted.isEmpty())
{
flushingBytes = countFlushingBytes();
- if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+ if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
break;
ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
long size = cfs.getTotalMemtableLiveSize();
+ if (size == 0)
+ break;
logger.info("flushing {} to free up {} bytes", cfs, size);
liveBytes -= size;
cfs.forceFlush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
public PeriodicCommitLogExecutorService(final CommitLog commitLog)
{
- queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+ queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
import java.lang.management.ManagementFactory;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import javax.management.JMX;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
threadPoolProxy.getTotalBlockedTasks()));
}
// one offs
- CompactionManager cm = CompactionManager.instance;
logger.info(String.format("%-25s%10s%10s",
- "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+ "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks()));
+ logger.info(String.format("%-25s%10s%10s",
+ "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
int pendingCommands = 0;
for (int n : MessagingService.instance().getCommandPendingTasks().values())
{