You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/04/08 01:49:06 UTC

svn commit: r1090064 - in /cassandra/trunk: ./ conf/ lib/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/conf/ test/long/org/apache/cassandra/...

Author: jbellis
Date: Thu Apr  7 23:49:06 2011
New Revision: 1090064

URL: http://svn.apache.org/viewvc?rev=1090064&view=rev
Log:
add a server-wide cap on memtable memory usage
patch by jbellis; reviewed by Stu Hood for CASSANDRA-2006

Added:
    cassandra/trunk/lib/jamm-0.2.jar
    cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java
    cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
Modified:
    cassandra/trunk/build.xml
    cassandra/trunk/conf/cassandra-env.sh
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java

Modified: cassandra/trunk/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/build.xml?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/build.xml (original)
+++ cassandra/trunk/build.xml Thu Apr  7 23:49:06 2011
@@ -615,6 +615,7 @@
         <jvmarg value="-Dstorage-config=${test.conf}"/>
         <jvmarg value="-Daccess.properties=${test.conf}/access.properties"/>
         <jvmarg value="-Dlog4j.configuration=log4j-junit.properties" />
+        <jvmarg value="-javaagent:${basedir}/lib/jamm-0.2.jar" />
         <jvmarg value="-ea"/>
         <optjvmargs/>
         <classpath>

Modified: cassandra/trunk/conf/cassandra-env.sh
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-env.sh?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-env.sh (original)
+++ cassandra/trunk/conf/cassandra-env.sh Thu Apr  7 23:49:06 2011
@@ -91,6 +91,9 @@ JMX_PORT="7199"
 # performance benefit (around 5%).
 JVM_OPTS="$JVM_OPTS -ea"
 
+# add the jamm javaagent
+JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.2.jar"
+
 # enable thread priorities, primarily so we can give periodic tasks
 # a lower priority to avoid interfering with client workload
 JVM_OPTS="$JVM_OPTS -XX:+UseThreadPriorities"

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Thu Apr  7 23:49:06 2011
@@ -141,6 +141,13 @@ reduce_cache_capacity_to: 0.6
 concurrent_reads: 32
 concurrent_writes: 32
 
+# Total memory to use for memtables.  Cassandra will flush the largest
+# memtable when this much memory is used.  Prefer using this to
+# the older, per-ColumnFamily memtable flush thresholds.
+# If omitted, Cassandra will set it to 1/3 of the heap.
+# If set to 0, only the old flush thresholds are used.
+# memtable_total_space_in_mb: 2048
+
 # This sets the amount of memtable flush writer threads.  These will
 # be blocked by disk io, and each one will hold a memtable in memory
 # while blocked. If you have a large heap and many data directories,

Added: cassandra/trunk/lib/jamm-0.2.jar
URL: http://svn.apache.org/viewvc/cassandra/trunk/lib/jamm-0.2.jar?rev=1090064&view=auto
==============================================================================
Files cassandra/trunk/lib/jamm-0.2.jar (added) and cassandra/trunk/lib/jamm-0.2.jar Thu Apr  7 23:49:06 2011 differ

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Thu Apr  7 23:49:06 2011
@@ -57,7 +57,8 @@ public class Config
     public Integer concurrent_replicates = 32;
     
     public Integer memtable_flush_writers = null; // will get set to the length of data dirs in DatabaseDescriptor
-    
+    public Integer memtable_total_space_in_mb;
+
     public Integer sliced_buffer_size_in_kb = 64;
     
     public Integer storage_port = 7000;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Thu Apr  7 23:49:06 2011
@@ -231,6 +231,9 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("conf.concurrent_replicates must be at least 2");
             }
 
+            if (conf.memtable_total_space_in_mb == null)
+                conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576));
+
             /* Memtable flush writer threads */
             if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1)
             {
@@ -797,6 +800,8 @@ public class DatabaseDescriptor
           maxDiskIndex = i;
         }
       }
+        logger.debug("expected data files size is {}; largest free partition has {} bytes free",
+                     expectedCompactedFileSize, maxFreeDisk);
       // Load factor of 0.9 we do not want to use the entire disk that is too risky.
       maxFreeDisk = (long)(0.9 * maxFreeDisk);
       if( expectedCompactedFileSize < maxFreeDisk )
@@ -1057,4 +1062,16 @@ public class DatabaseDescriptor
     {
         return conf.memtable_flush_queue_size;
     }
+
+    public static int getTotalMemtableSpaceInMB()
+    {
+        // should only be called if estimatesRealMemtableSize() is true
+        assert conf.memtable_total_space_in_mb > 0;
+        return conf.memtable_total_space_in_mb;
+    }
+
+    public static boolean estimatesRealMemtableSize()
+    {
+        return conf.memtable_total_space_in_mb > 0;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java Thu Apr  7 23:49:06 2011
@@ -125,7 +125,7 @@ public class BinaryMemtable implements I
     private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
     {
         logger.info("Writing " + this);
-        SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size());
+        SSTableWriter writer = cfs.createFlushWriter(sortedKeys.size(), DatabaseDescriptor.getBMTThreshold());
 
         for (DecoratedKey key : sortedKeys)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Apr  7 23:49:06 2011
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -102,6 +103,20 @@ public class ColumnFamilyStore implement
 
     public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
 
+    static
+    {
+        if (DatabaseDescriptor.estimatesRealMemtableSize())
+        {
+            logger.info("Global memtable threshold is enabled at {}MB", DatabaseDescriptor.getTotalMemtableSpaceInMB());
+            // (can block if flush queue fills up, so don't put on scheduledTasks)
+            StorageService.tasks.scheduleWithFixedDelay(new MeteredFlusher(), 1000, 1000, TimeUnit.MILLISECONDS);
+        }
+        else
+        {
+            logger.info("Global memtable threshold is disabled");
+        }
+    }
+
     public final Table table;
     public final String columnFamily;
     public final CFMetaData metadata;
@@ -143,7 +158,7 @@ public class ColumnFamilyStore implement
 
     /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
     public final Lock flushLock = new ReentrantLock();
-    
+
     public static enum CacheType
     {
         KEY_CACHE_TYPE("KeyCache"),
@@ -166,6 +181,12 @@ public class ColumnFamilyStore implement
     public final AutoSavingCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
     public final AutoSavingCache<DecoratedKey, ColumnFamily> rowCache;
 
+
+    /** ratio of in-memory memtable size, to serialized size */
+    volatile double liveRatio = 1.0;
+    /** ops count last time we computed liveRatio */
+    private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -569,12 +590,11 @@ public class ColumnFamilyStore implement
      * When the sstable object is closed, it will be renamed to a non-temporary
      * format, so incomplete sstables can be recognized and removed on startup.
      */
-    public String getFlushPath()
+    public String getFlushPath(long estimatedSize)
     {
-        long guessedSize = 2L * memsize.value() * 1024*1024; // 2* adds room for keys, column indexes
-        String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
+        String location = DatabaseDescriptor.getDataFileLocationForTable(table.name, estimatedSize);
         if (location == null)
-            throw new RuntimeException("Insufficient disk space to flush");
+            throw new RuntimeException("Insufficient disk space to flush " + estimatedSize + " bytes");
         return getTempSSTablePath(location);
     }
 
@@ -741,6 +761,22 @@ public class ColumnFamilyStore implement
             writeStats.addNano(System.nanoTime() - start);
         }
 
+        if (DatabaseDescriptor.estimatesRealMemtableSize())
+        {
+            while (true)
+            {
+                long last = liveRatioComputedAt.get();
+                long operations = writeStats.getOpCount();
+                if (operations < 2 * last)
+                    break;
+                if (liveRatioComputedAt.compareAndSet(last, operations))
+                {
+                    logger.debug("computing liveRatio of {} at {} ops", this, operations);
+                    mt.updateLiveRatio();
+                }
+            }
+        }
+
         return flushRequested ? mt : null;
     }
 
@@ -966,12 +1002,20 @@ public class ColumnFamilyStore implement
 
     public long getMemtableColumnsCount()
     {
-        return getMemtableThreadSafe().getCurrentOperations();
+        return getMemtableThreadSafe().getOperations();
     }
 
     public long getMemtableDataSize()
     {
-        return getMemtableThreadSafe().getCurrentThroughput();
+        return getMemtableThreadSafe().getLiveSize();
+    }
+
+    public long getTotalMemtableLiveSize()
+    {
+        long total = 0;
+        for (ColumnFamilyStore cfs : concatWithIndexes())
+            total += cfs.getMemtableThreadSafe().getLiveSize();
+        return total;
     }
 
     public int getMemtableSwitchCount()
@@ -2032,9 +2076,9 @@ public class ColumnFamilyStore implement
         return intern(name);
     }
 
-    public SSTableWriter createFlushWriter(long estimatedRows) throws IOException
+    public SSTableWriter createFlushWriter(long estimatedRows, long estimatedSize) throws IOException
     {
-        return new SSTableWriter(getFlushPath(), estimatedRows, metadata, partitioner);
+        return new SSTableWriter(getFlushPath(estimatedSize), estimatedRows, metadata, partitioner);
     }
 
     public SSTableWriter createCompactionWriter(long estimatedRows, String location) throws IOException
@@ -2047,4 +2091,8 @@ public class ColumnFamilyStore implement
         return Iterables.concat(indexedColumns.values(), Collections.singleton(this));
     }
 
+    public Set<Memtable> getMemtablesPendingFlush()
+    {
+        return data.getMemtablesPendingFlush();
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Apr  7 23:49:06 2011
@@ -25,10 +25,7 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -38,6 +35,7 @@ import com.google.common.collect.Peeking
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.columniterator.SimpleAbstractColumnIterator;
 import org.apache.cassandra.db.filter.AbstractColumnIterator;
@@ -47,13 +45,32 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.github.jamm.MemoryMeter;
 
 public class Memtable implements Comparable<Memtable>, IFlushable
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    private volatile boolean isFrozen;
+    // size in memory can never be less than serialized size
+    private static final double MIN_SANE_LIVE_RATIO = 1.0;
+    // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
+    private static final double MAX_SANE_LIVE_RATIO = 64.0;
+    private static final MemoryMeter meter = new MemoryMeter();
+
+    // we're careful to only allow one count to run at a time because counting is slow
+    // (can be minutes, for a large memtable and a busy server), so we could keep memtables
+    // alive after they're flushed and would otherwise be GC'd.
+    private static final ExecutorService meterExecutor = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>())
+    {
+        @Override
+        protected void afterExecute(Runnable r, Throwable t)
+        {
+            super.afterExecute(r, t);
+            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
+        }
+    };
 
+    private volatile boolean isFrozen;
     private final AtomicLong currentThroughput = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
 
@@ -63,10 +80,10 @@ public class Memtable implements Compara
 
     private final long THRESHOLD;
     private final long THRESHOLD_COUNT;
+    volatile static Memtable activelyMeasuring;
 
     public Memtable(ColumnFamilyStore cfs)
     {
-
         this.cfs = cfs;
         creationTime = System.currentTimeMillis();
         THRESHOLD = cfs.getMemtableThroughputInMB() * 1024L * 1024L;
@@ -90,12 +107,18 @@ public class Memtable implements Compara
     		return 0;
     }
 
-    public long getCurrentThroughput()
+    public long getLiveSize()
+    {
+        // 25% fudge factor
+        return (long) (currentThroughput.get() * cfs.liveRatio * 1.25);
+    }
+
+    public long getSerializedSize()
     {
         return currentThroughput.get();
     }
-    
-    public long getCurrentOperations()
+
+    public long getOperations()
     {
         return currentOperations.get();
     }
@@ -126,6 +149,54 @@ public class Memtable implements Compara
         resolve(key, columnFamily);
     }
 
+    public void updateLiveRatio()
+    {
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                activelyMeasuring = Memtable.this;
+
+                long start = System.currentTimeMillis();
+                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
+                // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
+                long deepSize = meter.measure(columnFamilies);
+                int objects = 0;
+                for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
+                {
+                    deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
+                    objects += entry.getValue().getColumnCount();
+                }
+                double newRatio = (double) deepSize / currentThroughput.get();
+
+                if (newRatio < MIN_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
+                    newRatio = MIN_SANE_LIVE_RATIO;
+                }
+                if (newRatio > MAX_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to maximum of 64 instead of {}, newRatio");
+                    newRatio = MAX_SANE_LIVE_RATIO;
+                }
+                cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
+
+                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} columns",
+                            new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects });
+                activelyMeasuring = null;
+            }
+        };
+
+        try
+        {
+            meterExecutor.submit(runnable);
+        }
+        catch (RejectedExecutionException e)
+        {
+            logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
+        }
+    }
+
     private void resolve(DecoratedKey key, ColumnFamily cf)
     {
         currentThroughput.addAndGet(cf.size());
@@ -155,8 +226,10 @@ public class Memtable implements Compara
     private SSTableReader writeSortedContents() throws IOException
     {
         logger.info("Writing " + this);
-        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size());
+        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 2 * getSerializedSize()); // 2* for keys
 
+        // (we can't clear out the map as-we-go to free up memory,
+        //  since the memtable is being used for queries in the "pending flush" category)
         for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
             writer.append(entry.getKey(), entry.getValue());
 
@@ -192,8 +265,8 @@ public class Memtable implements Compara
 
     public String toString()
     {
-        return String.format("Memtable-%s@%s(%s bytes, %s operations)",
-                             cfs.getColumnFamilyName(), hashCode(), currentThroughput, currentOperations);
+        return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
+                             cfs.getColumnFamilyName(), hashCode(), currentThroughput, getLiveSize(), currentOperations);
     }
 
     /**

Added: cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java?rev=1090064&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/MeteredFlusher.java Thu Apr  7 23:49:06 2011
@@ -0,0 +1,104 @@
+package org.apache.cassandra.db;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+class MeteredFlusher implements Runnable
+{
+    private static Logger logger = LoggerFactory.getLogger(MeteredFlusher.class);
+
+    public void run()
+    {
+        // first, find how much memory non-active memtables are using
+        Memtable activelyMeasuring = Memtable.activelyMeasuring;
+        long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+        flushingBytes += countFlushingBytes();
+
+        // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline)
+        // of the total size allotted.  Then, flush other CFs in order of size if necessary.
+        long liveBytes = 0;
+        try
+        {
+            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+            {
+                long size = cfs.getTotalMemtableLiveSize();
+                int maxInFlight = (int) Math.ceil((double) (1 // live memtable
+                                                            + 1 // potentially a flushed memtable being counted by jamm
+                                                            + DatabaseDescriptor.getFlushWriters()
+                                                            + DatabaseDescriptor.getFlushQueueSize())
+                                                  / (1 + cfs.getIndexedColumns().size()));
+                if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight)
+                {
+                    logger.info("flushing high-traffic column family {}", cfs);
+                    cfs.forceFlush();
+                }
+                else
+                {
+                    liveBytes += size;
+                }
+            }
+
+            if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L)
+                return;
+
+            logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+
+            // sort memtables by size
+            List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
+            Iterables.addAll(sorted, ColumnFamilyStore.all());
+            Collections.sort(sorted, new Comparator<ColumnFamilyStore>()
+            {
+                public int compare(ColumnFamilyStore o1, ColumnFamilyStore o2)
+                {
+                    long size1 = o1.getTotalMemtableLiveSize();
+                    long size2 = o2.getTotalMemtableLiveSize();
+                    if (size1 < size2)
+                        return -1;
+                    if (size1 > size2)
+                        return 1;
+                    return 0;
+                }
+            });
+
+            // flush largest first until we get below our threshold.
+            // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish
+            // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block)
+            while (true)
+            {
+                flushingBytes = countFlushingBytes();
+                if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty())
+                    break;
+
+                ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
+                long size = cfs.getTotalMemtableLiveSize();
+                logger.info("flushing {} to free up {} bytes", cfs, size);
+                liveBytes -= size;
+                cfs.forceFlush();
+            }
+        }
+        finally
+        {
+            logger.debug("memtable memory usage is {} bytes with {} live", liveBytes + flushingBytes, liveBytes);
+        }
+    }
+
+    private long countFlushingBytes()
+    {
+        long flushingBytes = 0;
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            for (Memtable memtable : cfs.getMemtablesPendingFlush())
+                flushingBytes += memtable.getLiveSize();
+        }
+        return flushingBytes;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Apr  7 23:49:06 2011
@@ -2196,7 +2196,7 @@ public class StorageService implements I
             for (ColumnFamilyStore subordinate : cfs.concatWithIndexes())
             {
                 ops += subordinate.getMemtableColumnsCount();
-                throughput = subordinate.getMemtableThroughputInMB();
+                throughput += subordinate.getMemtableDataSize();
             }
 
             if (ops > 0 && (largestByOps == null || ops > largestByOps.getMemtableColumnsCount()))

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamIn.java Thu Apr  7 23:49:06 2011
@@ -80,7 +80,7 @@ public class StreamIn
         // new local sstable
         Table table = Table.open(remotedesc.ksname);
         ColumnFamilyStore cfStore = table.getColumnFamilyStore(remotedesc.cfname);
-        Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
+        Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath(remote.size));
 
         return new PendingFile(localdesc, remote);
      }

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Thu Apr  7 23:49:06 2011
@@ -33,3 +33,4 @@ encryption_options:
     truststore: conf/.truststore
     truststore_password: cassandra
 incremental_backups: true
+flush_largest_memtables_at: 1.0

Added: cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java?rev=1090064&view=auto
==============================================================================
--- cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java (added)
+++ cassandra/trunk/test/long/org/apache/cassandra/db/MeteredFlusherTest.java Thu Apr  7 23:49:06 2011
@@ -0,0 +1,51 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.migration.AddColumnFamily;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class MeteredFlusherTest extends CleanupHelper
+{
+    @Test
+    public void testManyMemtables() throws IOException, ConfigurationException
+    {
+        Table table = Table.open("Keyspace1");
+        for (int i = 0; i < 100; i++)
+        {
+            CFMetaData metadata = new CFMetaData(table.name, "_CF" + i, ColumnFamilyType.Standard, UTF8Type.instance, null);
+            new AddColumnFamily(metadata).apply();
+        }
+
+        ByteBuffer name = ByteBufferUtil.bytes("c");
+        for (int j = 0; j < 200; j++)
+        {
+            for (int i = 0; i < 100; i++)
+            {
+                RowMutation rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key" + j));
+                ColumnFamily cf = ColumnFamily.create("Keyspace1", "_CF" + i);
+                // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
+                ByteBuffer value = ByteBuffer.allocate(100000);
+                cf.addColumn(new Column(name, value));
+                rm.add(cf);
+                rm.applyUnsafe();
+            }
+        }
+
+        int flushes = 0;
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            if (cfs.getColumnFamilyName().startsWith("_CF"))
+                flushes += cfs.getMemtableSwitchCount();
+        }
+        assert flushes > 0;
+    }
+}
+

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=1090064&r1=1090063&r2=1090064&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Thu Apr  7 23:49:06 2011
@@ -319,7 +319,7 @@ public class DefsTest extends CleanupHel
         ColumnFamilyStore store = Table.open(cfm.ksName).getColumnFamilyStore(cfm.cfName);
         assert store != null;
         store.forceBlockingFlush();
-        store.getFlushPath();
+        store.getFlushPath(1024);
         assert DefsTable.getFiles(cfm.ksName, cfm.cfName).size() > 0;
         
         new DropColumnFamily(ks.name, cfm.cfName).apply();