You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/08 01:49:06 UTC
svn commit: r1090064 - in /cassandra/trunk: ./ conf/ lib/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/service/
src/java/org/apache/cassandra/streaming/ test/conf/
test/long/org/apache/cassandra/...
Author: jbellis
Date: Thu Apr 7 23:49:06 2011
New Revision: 1090064
URL: http://svn.apache.org/viewvc?rev=1090064&view=rev
Log:
add a server-wide cap on memtable memory usage
patch by jbellis; reviewed by Stu Hood for CASSANDRA-2006
Added:
cassandra/trunk/lib/jamm-0.2.jar
cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java
cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
Modified:
cassandra/trunk/build.xml
cassandra/trunk/conf/cassandra-env.sh
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Thu Apr 7 23:49:06 2011
@@ -615,6 +615,7 @@
<jvmarg value="-Dstorage-config=${test.conf}"/>
<jvmarg value="-Daccess.properties=${test.conf}/access.properties"/>
<jvmarg value="-Dlog4j.configuration=log4j-junit.properties" />
+ <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.jar" />
<jvmarg value="-ea"/>
<optjvmargs/>
<classpath>
Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Thu Apr 7 23:49:06 2011
@@ -91,6 +91,9 @@ JMX_PORT="7199"
# performance benefit (around 5%).
JVM_OPTS="$JVM_OPTS -ea"
+# add the jamm javaagent
+JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.jar"
+
# enable thread priorities, primarily so we can give periodic tasks
# a lower priority to avoid interfering with client workload
JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Apr 7 23:49:06 2011
@@ -141,6 +141,13 @@ reduce_cache_capacity_to: 0.6
concurrent_reads: 32
concurrent_writes: 32
+# Total memory to use for memtables. Cassandra will flush the largest
+# memtable when this much memory is used. Prefer using this to
+# the older, per-ColumnFamily memtable flush thresholds.
+# If omitted, Cassandra will set it to 1/3 of the heap.
+# If set to 0, only the old flush thresholds are used.
+# memtable_total_space_in_mb: 2048
+
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
# while blocked. If you have a large heap and many data directories,
Added: cassandra/trunk/lib/jamm-0.2.jar
URL: http://svn.apache.org/viewvc/cassandra/trunk/lib/jamm-0.2.jar?rev=1090064&view=auto
==============================================================================
Files cassandra/trunk/lib/jamm-0.2.jar (added) and cassandra/trunk/lib/jamm-0.2.jar Thu Apr 7 23:49:06 2011 differ
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Thu Apr 7 23:49:06 2011
@@ -57,7 +57,8 @@ public class Config
public Integer concurrent_replicates = 32;
public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
-
+ public Integer memtable_total_space_in_mb;
+
public Integer sliced_buffer_size_in_kb = 64;
public Integer storage_port = 7000;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Apr 7 23:49:06 2011
@@ -231,6 +231,9 @@ public class DatabaseDescriptor
throw new ConfigurationException("conf.concurrent_replicates must be at least 2");
}
+ if (conf.memtable_total_space_in_mb == null)
+ conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
+
/* Memtable flush writer threads */
if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
{
@@ -797,6 +800,8 @@ public class DatabaseDescriptor
maxDiskIndex = i;
}
}
+ logger.debug("expected data files size is {}; largest free partition has {} bytes free",
+ expectedCompactedFileSize, maxFreeDisk);
// Load factor of 0.9 we do not want to use the entire disk that is too risky.
maxFreeDisk = (long)(0.9 * maxFreeDisk);
if( expectedCompactedFileSize < maxFreeDisk )
@@ -1057,4 +1062,16 @@ public class DatabaseDescriptor
{
return conf.memtable_flush_queue_size;
}
+
+ public static int getTotalMemtableSpaceInMB()
+ {
+ // should only be called if estimatesRealMemtableSize() is true
+ assert conf.memtable_total_space_in_mb > 0;
+ return conf.memtable_total_space_in_mb;
+ }
+
+ public static boolean estimatesRealMemtableSize()
+ {
+ return conf.memtable_total_space_in_mb > 0;
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Thu Apr 7 23:49:06 2011
@@ -125,7 +125,7 @@ public class BinaryMemtable implements I
private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size());
+ SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size(), DatabaseDescriptor.getBMTThreshold());
for (DecoratedKey key : sortedKeys)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Apr 7 23:49:06 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -102,6 +103,20 @@ public class ColumnFamilyStore implement
public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
+ static
+ {
+ if (DatabaseDescriptor.estimatesRealMemtableSize())
+ {
+ logger.info("Global memtable threshold is enabled at {}MB", DatabaseDescriptor.getTotalMemtableSpaceInMB());
+ // (can block if flush queue fills up, so don't put on scheduledTasks)
+ StorageService.tasks.scheduleWithFixedDelay(new MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS);
+ }
+ else
+ {
+ logger.info("Global memtable threshold is disabled");
+ }
+ }
+
public final Table table;
public final String columnFamily;
public final CFMetaData metadata;
@@ -143,7 +158,7 @@ public class ColumnFamilyStore implement
/** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
public final Lock flushLock = new ReentrantLock();
-
+
public static enum CacheType
{
KEY_CACHE_TYPE("KeyCache"),
@@ -166,6 +181,12 @@ public class ColumnFamilyStore implement
public final AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
public final AutoSavingCache<DecoratedKey, ColumnFamily> rowCache;
+
+ /** ratio of in-memory memtable size, to serialized size */
+ volatile double liveRatio = 1.0;
+ /** ops count last time we computed liveRatio */
+ private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
+
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -569,12 +590,11 @@ public class ColumnFamilyStore implement
* When the sstable object is closed, it will be renamed to a non-temporary
* format, so incomplete sstables can be recognized and removed on startup.
*/
- public String getFlushPath()
+ public String getFlushPath(long estimatedSize)
{
- long guessedSize = 2L * memsize.value() * 1024*1024; // 2* adds room for keys, column indexes
- String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
+ String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, estimatedSize);
if (location == null)
- throw new RuntimeException("Insufficient disk space to flush");
+ throw new RuntimeException("Insufficient disk space to flush " + estimatedSize + " bytes");
return getTempSSTablePath(location);
}
@@ -741,6 +761,22 @@ public class ColumnFamilyStore implement
writeStats.addNano(System.nanoTime() - start);
}
+ if (DatabaseDescriptor.estimatesRealMemtableSize())
+ {
+ while (true)
+ {
+ long last = liveRatioComputedAt.get();
+ long operations = writeStats.getOpCount();
+ if (operations < 2 * last)
+ break;
+ if (liveRatioComputedAt.compareAndSet(last, operations))
+ {
+ logger.debug("computing liveRatio of {} at {} ops", this, operations);
+ mt.updateLiveRatio();
+ }
+ }
+ }
+
return flushRequested ? mt : null;
}
@@ -966,12 +1002,20 @@ public class ColumnFamilyStore implement
public long getMemtableColumnsCount()
{
- return getMemtableThreadSafe().getCurrentOperations();
+ return getMemtableThreadSafe().getOperations();
}
public long getMemtableDataSize()
{
- return getMemtableThreadSafe().getCurrentThroughput();
+ return getMemtableThreadSafe().getLiveSize();
+ }
+
+ public long getTotalMemtableLiveSize()
+ {
+ long total = 0;
+ for (ColumnFamilyStore cfs : concatWithIndexes())
+ total += cfs.getMemtableThreadSafe().getLiveSize();
+ return total;
}
public int getMemtableSwitchCount()
@@ -2032,9 +2076,9 @@ public class ColumnFamilyStore implement
return intern(name);
}
- public SSTableWriter createFlushWriter(long estimatedRows) throws IOException
+ public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize) throws IOException
{
- return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner);
+ return new SSTableWriter(getFlushPath(estimatedSize), estimatedRows, metadata, partitioner);
}
public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException
@@ -2047,4 +2091,8 @@ public class ColumnFamilyStore implement
return Iterables.concat(indexedColumns.values(), Collections.singleton(this));
}
+ public Set<Memtable> getMemtablesPendingFlush()
+ {
+ return data.getMemtablesPendingFlush();
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Apr 7 23:49:06 2011
@@ -25,10 +25,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +35,7 @@ import com.google.common.collect.Peeking
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
import org.apache.cassandra.db.filter.AbstractColumnIterator;
@@ -47,13 +45,32 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.WrappedRunnable;
+import org.github.jamm.MemoryMeter;
public class Memtable implements Comparable<Memtable>, IFlushable
{
private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
- private volatile boolean isFrozen;
+ // size in memory can never be less than serialized size
+ private static final double MIN_SANE_LIVE_RATIO = 1.0;
+ // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
+ private static final double MAX_SANE_LIVE_RATIO = 64.0;
+ private static final MemoryMeter meter = new MemoryMeter();
+
+ // we're careful to only allow one count to run at a time because counting is slow
+ // (can be minutes, for a large memtable and a busy server), so we could keep memtables
+ // alive after they're flushed and would otherwise be GC'd.
+ private static final ExecutorService meterExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
+ {
+ @Override
+ protected void afterExecute(Runnable r, Throwable t)
+ {
+ super.afterExecute(r, t);
+ DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
+ }
+ };
+ private volatile boolean isFrozen;
private final AtomicLong currentThroughput = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -63,10 +80,10 @@ public class Memtable implements Compara
private final long THRESHOLD;
private final long THRESHOLD_COUNT;
+ volatile static Memtable activelyMeasuring;
public Memtable(ColumnFamilyStore cfs)
{
-
this.cfs = cfs;
creationTime = System.currentTimeMillis();
THRESHOLD = cfs.getMemtableThroughputInMB() * 1024L * 1024L;
@@ -90,12 +107,18 @@ public class Memtable implements Compara
return 0;
}
- public long getCurrentThroughput()
+ public long getLiveSize()
+ {
+ // 25% fudge factor
+ return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
+ }
+
+ public long getSerializedSize()
{
return currentThroughput.get();
}
-
- public long getCurrentOperations()
+
+ public long getOperations()
{
return currentOperations.get();
}
@@ -126,6 +149,54 @@ public class Memtable implements Compara
resolve(key, columnFamily);
}
+ public void updateLiveRatio()
+ {
+ Runnable runnable = new Runnable()
+ {
+ public void run()
+ {
+ activelyMeasuring = Memtable.this;
+
+ long start = System.currentTimeMillis();
+ // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+ // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+ long deepSize = meter.measure(columnFamilies);
+ int objects = 0;
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
+ {
+ deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
+ objects += entry.getValue().getColumnCount();
+ }
+ double newRatio = (double) deepSize / currentThroughput.get();
+
+ if (newRatio < MIN_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
+ newRatio = MIN_SANE_LIVE_RATIO;
+ }
+ if (newRatio > MAX_SANE_LIVE_RATIO)
+ {
+ logger.warn("setting live ratio to maximum of 64 instead of {}, newRatio");
+ newRatio = MAX_SANE_LIVE_RATIO;
+ }
+ cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
+
+ logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns",
+ new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
+ activelyMeasuring = null;
+ }
+ };
+
+ try
+ {
+ meterExecutor.submit(runnable);
+ }
+ catch (RejectedExecutionException e)
+ {
+ logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
+ }
+ }
+
private void resolve(DecoratedKey key, ColumnFamily cf)
{
currentThroughput.addAndGet(cf.size());
@@ -155,8 +226,10 @@ public class Memtable implements Compara
private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
- SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size());
+ SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 2 * getSerializedSize()); // 2* for keys
+ // (we can't clear out the map as-we-go to free up memory,
+ // since the memtable is being used for queries in the "pending flush" category)
for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
writer.append(entry.getKey(), entry.getValue());
@@ -192,8 +265,8 @@ public class Memtable implements Compara
public String toString()
{
- return String.format("Memtable-%s@%s(%s bytes, %s operations)",
- cfs.getColumnFamilyName(), hashCode(), currentThroughput, currentOperations);
+ return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
+ cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
}
/**
Added: cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java?rev=1090064&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java Thu Apr 7 23:49:06 2011
@@ -0,0 +1,104 @@
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+class MeteredFlusher implements Runnable
+{
+ private static Logger logger = LoggerFactory.getLogger(MeteredFlusher.class);
+
+ public void run()
+ {
+ // first, find how much memory non-active memtables are using
+ Memtable activelyMeasuring = Memtable.activelyMeasuring;
+ long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+ flushingBytes += countFlushingBytes();
+
+ // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
+ // of the total size allotted. Then, flush other CFs in order of size if necessary.
+ long liveBytes = 0;
+ try
+ {
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ long size = cfs.getTotalMemtableLiveSize();
+ int maxInFlight = (int) Math.ceil((double) (1 // live memtable
+ + 1 // potentially a flushed memtable being counted by jamm
+ + DatabaseDescriptor.getFlushWriters()
+ + DatabaseDescriptor.getFlushQueueSize())
+ / (1 + cfs.getIndexedColumns().size()));
+ if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+ {
+ logger.info("flushing high-traffic column family {}", cfs);
+ cfs.forceFlush();
+ }
+ else
+ {
+ liveBytes += size;
+ }
+ }
+
+ if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+ return;
+
+ logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+
+ // sort memtables by size
+ List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
+ Iterables.addAll(sorted, ColumnFamilyStore.all());
+ Collections.sort(sorted, new Comparator<ColumnFamilyStore>()
+ {
+ public int compare(ColumnFamilyStore o1, ColumnFamilyStore o2)
+ {
+ long size1 = o1.getTotalMemtableLiveSize();
+ long size2 = o2.getTotalMemtableLiveSize();
+ if (size1 < size2)
+ return -1;
+ if (size1 > size2)
+ return 1;
+ return 0;
+ }
+ });
+
+ // flush largest first until we get below our threshold.
+ // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
+ // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
+ while (true)
+ {
+ flushingBytes = countFlushingBytes();
+ if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+ break;
+
+ ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
+ long size = cfs.getTotalMemtableLiveSize();
+ logger.info("flushing {} to free up {} bytes", cfs, size);
+ liveBytes -= size;
+ cfs.forceFlush();
+ }
+ }
+ finally
+ {
+ logger.debug("memtable memory usage is {} bytes with {} live", liveBytes + flushingBytes, liveBytes);
+ }
+ }
+
+ private long countFlushingBytes()
+ {
+ long flushingBytes = 0;
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ for (Memtable memtable : cfs.getMemtablesPendingFlush())
+ flushingBytes += memtable.getLiveSize();
+ }
+ return flushingBytes;
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Apr 7 23:49:06 2011
@@ -2196,7 +2196,7 @@ public class StorageService implements I
for (ColumnFamilyStore subordinate : cfs.concatWithIndexes())
{
ops += subordinate.getMemtableColumnsCount();
- throughput = subordinate.getMemtableThroughputInMB();
+ throughput += subordinate.getMemtableDataSize();
}
if (ops > 0 && (largestByOps == null || ops > largestByOps.getMemtableColumnsCount()))
Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Thu Apr 7 23:49:06 2011
@@ -80,7 +80,7 @@ public class StreamIn
// new local sstable
Table table = Table.open(remotedesc.ksname);
ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
- Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
+ Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size));
return new PendingFile(localdesc, remote);
}
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Thu Apr 7 23:49:06 2011
@@ -33,3 +33,4 @@ encryption_options:
truststore: conf/.truststore
truststore_password: cassandra
incremental_backups: true
+flush_largest_memtables_at: 1.0
Added: cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java?rev=1090064&view=auto
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java (added)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java Thu Apr 7 23:49:06 2011
@@ -0,0 +1,51 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.migration.AddColumnFamily;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class MeteredFlusherTest extends CleanupHelper
+{
+ @Test
+ public void testManyMemtables() throws IOException, ConfigurationException
+ {
+ Table table = Table.open("Keyspace1");
+ for (int i = 0; i < 100; i++)
+ {
+ CFMetaData metadata = new CFMetaData(table.name, "_CF" + i, ColumnFamilyType.Standard, UTF8Type.instance, null);
+ new AddColumnFamily(metadata).apply();
+ }
+
+ ByteBuffer name = ByteBufferUtil.bytes("c");
+ for (int j = 0; j < 200; j++)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key" + j));
+ ColumnFamily cf = ColumnFamily.create("Keyspace1", "_CF" + i);
+ // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
+ ByteBuffer value = ByteBuffer.allocate(100000);
+ cf.addColumn(new Column(name, value));
+ rm.add(cf);
+ rm.applyUnsafe();
+ }
+ }
+
+ int flushes = 0;
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ if (cfs.getColumnFamilyName().startsWith("_CF"))
+ flushes += cfs.getMemtableSwitchCount();
+ }
+ assert flushes > 0;
+ }
+}
+
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Thu Apr 7 23:49:06 2011
@@ -319,7 +319,7 @@ public class DefsTest extends CleanupHel
ColumnFamilyStore store = Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
assert store != null;
store.forceBlockingFlush();
- store.getFlushPath();
+ store.getFlushPath(1024);
assert DefsTable.getFiles(cfm.ksName, cfm.cfName).size() > 0;
new DropColumnFamily(ks.name, cfm.cfName).apply();