You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ed...@apache.org on 2021/01/19 23:42:09 UTC

[cassandra] branch cassandra-3.0 updated: Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261) Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261

This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 0a1e900  Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261) Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261
0a1e900 is described below

commit 0a1e900a0a042f78d7d5d6625bc98b84eb463e69
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Fri Nov 6 18:46:14 2020 -0500

    Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261)
    Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261
---
 CHANGES.txt                                        |   1 +
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   8 +
 .../cassandra/config/DatabaseDescriptor.java       |  39 +--
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 373 +++++++++++----------
 src/java/org/apache/cassandra/db/Memtable.java     |   4 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |   3 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  12 +
 .../apache/cassandra/utils/memory/HeapPool.java    |   2 +-
 .../cassandra/utils/memory/MemtableAllocator.java  | 124 +++++--
 .../cassandra/utils/memory/MemtableCleaner.java    |  40 +++
 .../utils/memory/MemtableCleanerThread.java        |  67 +++-
 .../cassandra/utils/memory/MemtablePool.java       |  35 +-
 .../apache/cassandra/utils/memory/NativePool.java  |   2 +-
 .../apache/cassandra/utils/memory/SlabPool.java    |   2 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  14 +-
 .../utils/memory/MemtableCleanerThreadTest.java    | 187 +++++++++++
 .../utils/memory/NativeAllocatorTest.java          | 204 ++++++-----
 17 files changed, 772 insertions(+), 345 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ee4cf6e..4f3ab1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.24:
+ * Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261)
  * Improve empty hint file handling during startup (CASSANDRA-16162)
  * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
  * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
index b54fa3f..8e72d91 100644
--- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class InfiniteLoopExecutor
 {
     private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
@@ -81,4 +83,10 @@ public class InfiniteLoopExecutor
         thread.join(unit.toMillis(time));
         return !thread.isAlive();
     }
+
+    @VisibleForTesting
+    public boolean isAlive()
+    {
+        return this.thread.isAlive();
+    }
 }
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3f9aa96..52f01b6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -151,7 +151,7 @@ public class DatabaseDescriptor
         String loaderClass = System.getProperty("cassandra.config.loader");
         ConfigurationLoader loader = loaderClass == null
                                      ? new YamlConfigurationLoader()
-                                   : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
+                                   : FBUtilities.construct(loaderClass, "configuration loading");
         Config config = loader.loadConfig();
 
         if (!hasLoggedConfig)
@@ -214,7 +214,7 @@ public class DatabaseDescriptor
             }
             catch (UnknownHostException e)
             {
-                throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
+                throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false);
             }
 
             if (listenAddress.isAnyLocalAddress())
@@ -234,7 +234,7 @@ public class DatabaseDescriptor
             }
             catch (UnknownHostException e)
             {
-                throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
+                throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false);
             }
 
             if (broadcastAddress.isAnyLocalAddress())
@@ -275,7 +275,7 @@ public class DatabaseDescriptor
             }
             catch (UnknownHostException e)
             {
-                throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
+                throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false);
             }
 
             if (broadcastRpcAddress.isAnyLocalAddress())
@@ -520,18 +520,14 @@ public class DatabaseDescriptor
         EndpointSnitchInfo.create();
 
         localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-        localComparator = new Comparator<InetAddress>()
-        {
-            public int compare(InetAddress endpoint1, InetAddress endpoint2)
-            {
-                boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
-                boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
-                if (local1 && !local2)
-                    return -1;
-                if (local2 && !local1)
-                    return 1;
-                return 0;
-            }
+        localComparator = (endpoint1, endpoint2) -> {
+            boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+            boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+            if (local1 && !local2)
+                return -1;
+            if (local2 && !local1)
+                return 1;
+            return 0;
         };
 
         /* Request Scheduler setup */
@@ -592,7 +588,7 @@ public class DatabaseDescriptor
         if (conf.commitlog_total_space_in_mb == null)
         {
             int preferredSize = 8192;
-            int minSize = 0;
+            int minSize;
             try
             {
                 // use 1/4 of available space.  See discussion on #10013 and #10199
@@ -1061,7 +1057,7 @@ public class DatabaseDescriptor
 
     public static Collection<String> tokensFromString(String tokenString)
     {
-        List<String> tokens = new ArrayList<String>();
+        List<String> tokens = new ArrayList<>();
         if (tokenString != null)
             for (String token : tokenString.split(","))
                 tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", ""));
@@ -1747,7 +1743,7 @@ public class DatabaseDescriptor
     public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
     {
         String name = cacheType.toString()
-                + (version == null ? "" : "-" + version + "." + extension);
+                + (version == null ? "" : '-' + version + '.' + extension);
         return new File(conf.saved_caches_directory, name);
     }
 
@@ -2026,12 +2022,13 @@ public class DatabaseDescriptor
     {
         long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
         long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
+        final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable;
         switch (conf.memtable_allocation_type)
         {
             case unslabbed_heap_buffers:
-                return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+                return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, cleaner);
             case heap_buffers:
-                return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+                return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, cleaner);
             case offheap_buffers:
                 throw new ConfigurationException("offheap_buffers are not available in 3.0. They will be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
 
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a9c087e..61d60b1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.*;
+import java.util.Objects;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -126,41 +127,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                                                                          StageManager.KEEPALIVE,
-                                                                                          TimeUnit.SECONDS,
-                                                                                          new LinkedBlockingQueue<Runnable>(),
-                                                                                          new NamedThreadFactory("MemtableFlushWriter"),
-                                                                                          "internal");
+    private static final ThreadPoolExecutor flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+                                                                                             StageManager.KEEPALIVE,
+                                                                                             TimeUnit.SECONDS,
+                                                                                             new LinkedBlockingQueue<>(),
+                                                                                             new NamedThreadFactory("MemtableFlushWriter"),
+                                                                                             "internal");
 
     // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
-    private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
-                                                                                              StageManager.KEEPALIVE,
-                                                                                              TimeUnit.SECONDS,
-                                                                                              new LinkedBlockingQueue<Runnable>(),
-                                                                                              new NamedThreadFactory("MemtablePostFlush"),
-                                                                                              "internal");
-
-    private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
-                                                                                            StageManager.KEEPALIVE,
-                                                                                            TimeUnit.SECONDS,
-                                                                                            new LinkedBlockingQueue<Runnable>(),
-                                                                                            new NamedThreadFactory("MemtableReclaimMemory"),
-                                                                                            "internal");
+    private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
+                                                                                                 StageManager.KEEPALIVE,
+                                                                                                 TimeUnit.SECONDS,
+                                                                                                 new LinkedBlockingQueue<>(),
+                                                                                                 new NamedThreadFactory("MemtablePostFlush"),
+                                                                                                 "internal");
+
+    private static final ThreadPoolExecutor reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
+                                                                                               StageManager.KEEPALIVE,
+                                                                                               TimeUnit.SECONDS,
+                                                                                               new LinkedBlockingQueue<>(),
+                                                                                               new NamedThreadFactory("MemtableReclaimMemory"),
+                                                                                               "internal");
 
     private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"};
     private static final String[] COUNTER_DESCS = new String[]
-    { "partition key in raw hex bytes",
-      "value of this partition for given sampler",
-      "value is within the error bounds plus or minus of this",
-      "the partition key turned into a human readable format" };
+                                                  { "partition key in raw hex bytes",
+                                                    "value of this partition for given sampler",
+                                                    "value is within the error bounds plus or minus of this",
+                                                    "the partition key turned into a human readable format" };
     private static final CompositeType COUNTER_COMPOSITE_TYPE;
     private static final TabularType COUNTER_TYPE;
 
     private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"};
     private static final String[] SAMPLER_DESCS = new String[]
-    { "cardinality of partitions",
-      "list of counter results" };
+                                                  { "cardinality of partitions",
+                                                    "list of counter results" };
 
     private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS";
     private static final CompositeType SAMPLING_RESULT;
@@ -234,7 +235,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         ExecutorUtils.shutdownAndWait(timeout, unit, reclaimExecutor, postFlushExecutor, flushExecutor);
     }
 
-
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new settings.
@@ -271,7 +271,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             logger.trace("scheduling flush in {} ms", period);
             WrappedRunnable runnable = new WrappedRunnable()
             {
-                protected void runMayThrow() throws Exception
+                protected void runMayThrow()
                 {
                     synchronized (data)
                     {
@@ -299,14 +299,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public static Runnable getBackgroundCompactionTaskSubmitter()
     {
-        return new Runnable()
-        {
-            public void run()
-            {
-                for (Keyspace keyspace : Keyspace.all())
-                    for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
-                        CompactionManager.instance.submitBackground(cfs);
-            }
+        return () -> {
+            for (Keyspace keyspace : Keyspace.all())
+                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+                    CompactionManager.instance.submitBackground(cfs);
         };
     }
 
@@ -360,11 +356,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     private ColumnFamilyStore(Keyspace keyspace,
-                             String columnFamilyName,
-                             int generation,
-                             CFMetaData metadata,
-                             Directories directories,
-                             boolean loadSSTables)
+                              String columnFamilyName,
+                              int generation,
+                              CFMetaData metadata,
+                              Directories directories,
+                              boolean loadSSTables)
     {
         this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true);
     }
@@ -372,15 +368,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     @VisibleForTesting
     public ColumnFamilyStore(Keyspace keyspace,
-                              String columnFamilyName,
-                              int generation,
-                              CFMetaData metadata,
-                              Directories directories,
-                              boolean loadSSTables,
-                              boolean registerBookkeeping)
+                             String columnFamilyName,
+                             int generation,
+                             CFMetaData metadata,
+                             Directories directories,
+                             boolean loadSSTables,
+                             boolean registerBookkeeping)
     {
         assert directories != null;
-        assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
+        assert metadata != null : "null metadata for " + keyspace + ':' + columnFamilyName;
 
         this.keyspace = keyspace;
         this.metadata = metadata;
@@ -435,8 +431,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             // register the mbean
             mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
-                                         isIndex() ? "IndexTables" : "Tables",
-                                         keyspace.getName(), name);
+                                      isIndex() ? "IndexTables" : "Tables",
+                                      keyspace.getName(), name);
             oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
                                          isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
                                          keyspace.getName(), name);
@@ -453,24 +449,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 throw new RuntimeException(e);
             }
             logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry);
-            latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
-            {
-                public void run()
+            latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> {
+                SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry;
+                switch (retryPolicy.kind())
                 {
-                    SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry;
-                    switch (retryPolicy.kind())
-                    {
-                        case PERCENTILE:
-                            // get percentile in nanos
-                            sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold()));
-                            break;
-                        case CUSTOM:
-                            sampleLatencyNanos = (long) retryPolicy.threshold();
-                            break;
-                        default:
-                            sampleLatencyNanos = Long.MAX_VALUE;
-                            break;
-                    }
+                    case PERCENTILE:
+                        // get percentile in nanos
+                        sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold()));
+                        break;
+                    case CUSTOM:
+                        sampleLatencyNanos = (long) retryPolicy.threshold();
+                        break;
+                    default:
+                        sampleLatencyNanos = Long.MAX_VALUE;
+                        break;
                 }
             }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
         }
@@ -575,14 +567,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // get the max generation number, to prevent generation conflicts
         Directories directories = new Directories(metadata, initialDirectories);
         Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
-        List<Integer> generations = new ArrayList<Integer>();
+        List<Integer> generations = new ArrayList<>();
         for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
         {
             Descriptor desc = entry.getKey();
             generations.add(desc.generation);
             if (!desc.isCompatible())
                 throw new RuntimeException(String.format("Incompatible SSTable found. Current version %s is unable to read file: %s. Please run upgradesstables.",
-                        desc.getFormat().getLatestVersion(), desc));
+                                                         desc.getFormat().getLatestVersion(), desc));
         }
         Collections.sort(generations);
         int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
@@ -599,7 +591,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Directories directories = new Directories(metadata, initialDirectories);
         Set<File> cleanedDirectories = new HashSet<>();
 
-         // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
+        // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
         clearEphemeralSnapshots(directories);
 
         directories.removeTemporaryDirectories();
@@ -637,13 +629,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         // cleanup incomplete saved caches
-        Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + "-" + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$");
+        Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + '-' + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$");
         File dir = new File(DatabaseDescriptor.getSavedCachesLocation());
 
         if (dir.exists())
         {
             assert dir.isDirectory();
-            for (File file : dir.listFiles())
+            for (File file : Objects.requireNonNull(dir.listFiles()))
                 if (tmpCacheFilePattern.matcher(file.getName()).matches())
                     if (!file.delete())
                         logger.warn("could not delete {}", file.getAbsolutePath());
@@ -666,7 +658,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public static void loadNewSSTables(String ksName, String cfName)
     {
-        /** ks/cf existence checks will be done by open and getCFS methods for us */
+        /* ks/cf existence checks will be done by open and getCFS methods for us */
         Keyspace keyspace = Keyspace.open(ksName);
         keyspace.getColumnFamilyStore(cfName).loadNewSSTables();
     }
@@ -693,8 +685,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             if (!descriptor.isCompatible())
                 throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
-                        descriptor.getFormat().getLatestVersion(),
-                        descriptor));
+                                                         descriptor.getFormat().getLatestVersion(),
+                                                         descriptor));
 
             // force foreign sstables to level 0
             try
@@ -779,7 +771,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
 
-        Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
+        Set<String> indexes = new HashSet<>(Arrays.asList(idxNames));
 
         Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL);
         try (Refs<SSTableReader> refs = Refs.ref(sstables))
@@ -834,6 +826,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             if (data.getView().getCurrentMemtable() == memtable)
                 return switchMemtable();
         }
+        logger.debug("Memtable is no longer current, returning future that completes when current flushing operation completes");
         return waitForFlushes();
     }
 
@@ -879,7 +872,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
 
         logger.debug("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap",
-                                                                     onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100));
+                                                                      onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100));
     }
 
 
@@ -927,13 +920,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // we grab the current memtable; once any preceding memtables have flushed, we know its
         // commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable)
         final Memtable current = data.getView().getCurrentMemtable();
-        ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(new Callable<ReplayPosition>()
-        {
-            public ReplayPosition call()
-            {
-                logger.debug("forceFlush requested but everything is clean in {}", name);
-                return current.getCommitLogLowerBound();
-            }
+        ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(() -> {
+            logger.debug("forceFlush requested but everything is clean in {}", name);
+            return current.getCommitLogLowerBound();
         });
         postFlushExecutor.execute(task);
         return task;
@@ -1007,11 +996,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         private Flush(boolean truncate)
         {
+            if (logger.isTraceEnabled())
+                logger.trace("Creating flush task {}@{}", hashCode(), name);
             // if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard
             this.truncate = truncate;
 
             metric.pendingFlushes.inc();
-            /**
+            /*
              * To ensure correctness of switch without blocking writes, run() needs to wait for all write operations
              * started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering
              * that all write operations register themselves with, and assigning this barrier to the memtables,
@@ -1020,7 +1011,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
              * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so
              * that we know the CL position we are dirty to, which can be marked clean when we complete.
              */
-            writeBarrier = keyspace.writeOrder.newBarrier();
+            writeBarrier = Keyspace.writeOrder.newBarrier();
 
             // submit flushes for the memtable for any indexed sub-cfses, and our own
             AtomicReference<ReplayPosition> commitLogUpperBound = new AtomicReference<>();
@@ -1044,15 +1035,26 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // replay positions have also completed, i.e. the memtables are done and ready to flush
             writeBarrier.issue();
             postFlush = new PostFlush(memtables);
+
+            if (logger.isTraceEnabled())
+                logger.trace("Created flush task {}@{}", hashCode(), name);
         }
 
         public void run()
         {
+            if (logger.isTraceEnabled())
+                logger.trace("Flush task {}@{} starts executing, waiting on barrier", hashCode(), name);
+
+            long start = System.nanoTime();
+
             // mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit
             // if they are stuck waiting on it, then wait for them all to complete
             writeBarrier.markBlocking();
             writeBarrier.await();
 
+            if (logger.isTraceEnabled())
+                logger.trace("Flush task for task {}@{} waited {} ms at the barrier", hashCode(), name, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
             // mark all memtables as flushing, removing them from the live memtable list
             for (Memtable memtable : memtables)
                 memtable.cfs.data.markFlushing(memtable);
@@ -1088,8 +1090,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             finally
             {
+                if (logger.isTraceEnabled())
+                    logger.trace("Flush task {}@{} signaling post flush task", hashCode(), name);
+
                 // signal the post-flush we've done our work
                 postFlush.latch.countDown();
+
+                if (logger.isTraceEnabled())
+                    logger.trace("Flush task task {}@{} finished", hashCode(), name);
             }
         }
 
@@ -1100,7 +1108,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             readBarrier.issue();
             reclaimExecutor.execute(new WrappedRunnable()
             {
-                public void runMayThrow() throws InterruptedException, ExecutionException
+                public void runMayThrow()
                 {
                     readBarrier.await();
                     memtable.setDiscarded();
@@ -1130,58 +1138,77 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
      * queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
      */
-    public static class FlushLargestColumnFamily implements Runnable
+    public static CompletableFuture<Boolean> flushLargestMemtable()
     {
-        public void run()
+        float largestRatio = 0f;
+        Memtable largest = null;
+        float liveOnHeap = 0, liveOffHeap = 0;
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            float largestRatio = 0f;
-            Memtable largest = null;
-            float liveOnHeap = 0, liveOffHeap = 0;
-            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-            {
-                // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
-                // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
-                // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
-                Memtable current = cfs.getTracker().getView().getCurrentMemtable();
-
-                // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
-                // both on- and off-heap, and select the largest of the two ratios to weight this CF
-                float onHeap = 0f, offHeap = 0f;
-                onHeap += current.getAllocator().onHeap().ownershipRatio();
-                offHeap += current.getAllocator().offHeap().ownershipRatio();
-
-                for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores())
-                {
-                    MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
-                    onHeap += allocator.onHeap().ownershipRatio();
-                    offHeap += allocator.offHeap().ownershipRatio();
-                }
+            // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
+            // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
+            // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
+            Memtable current = cfs.getTracker().getView().getCurrentMemtable();
 
-                float ratio = Math.max(onHeap, offHeap);
-                if (ratio > largestRatio)
-                {
-                    largest = current;
-                    largestRatio = ratio;
-                }
+            // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
+            // both on- and off-heap, and select the largest of the two ratios to weight this CF
+            float onHeap = 0f, offHeap = 0f;
+            onHeap += current.getAllocator().onHeap().ownershipRatio();
+            offHeap += current.getAllocator().offHeap().ownershipRatio();
 
-                liveOnHeap += onHeap;
-                liveOffHeap += offHeap;
+            for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores())
+            {
+                MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
+                onHeap += allocator.onHeap().ownershipRatio();
+                offHeap += allocator.offHeap().ownershipRatio();
             }
 
-            if (largest != null)
+            float ratio = Math.max(onHeap, offHeap);
+            if (ratio > largestRatio)
             {
-                float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
-                float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
-                float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
-                float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
-                float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio();
-                float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio();
-                logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}",
-                            largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap),
-                            ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap));
-                largest.cfs.switchMemtableIfCurrent(largest);
+                largest = current;
+                largestRatio = ratio;
             }
+
+            liveOnHeap += onHeap;
+            liveOffHeap += offHeap;
+        }
+
+        CompletableFuture<Boolean> returnFuture = new CompletableFuture<>();
+
+        if (largest != null)
+        {
+            float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
+            float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
+            float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
+            float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
+            float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio();
+            float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio();
+            logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}",
+                         largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap),
+                         ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap));
+
+            ListenableFuture<ReplayPosition> flushFuture = largest.cfs.switchMemtableIfCurrent(largest);
+            flushFuture.addListener(() -> {
+                try
+                {
+                    flushFuture.get();
+                    returnFuture.complete(true);
+                }
+                catch (Throwable t)
+                {
+                    returnFuture.completeExceptionally(t);
+                }
+            }, MoreExecutors.directExecutor());
+        }
+        else
+        {
+            logger.debug("Flushing of largest memtable, not done, no memtable found");
+
+            returnFuture.complete(false);
         }
+
+        return returnFuture;
     }
 
     private static String ratio(float onHeap, float offHeap)
@@ -1228,8 +1255,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         catch (RuntimeException e)
         {
             throw new RuntimeException(e.getMessage()
-                                                     + " for ks: "
-                                                     + keyspace.getName() + ", table: " + name, e);
+                                       + " for ks: "
+                                       + keyspace.getName() + ", table: " + name, e);
         }
 
     }
@@ -1510,7 +1537,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                   nowInSec,
                                                                   filter.selectsAllPartition(),
                                                                   metadata.enforceStrictLiveness()))
-                || filter.isFullyCoveredBy(cached);
+               || filter.isFullyCoveredBy(cached);
     }
 
     public int gcBefore(int nowInSec)
@@ -1548,7 +1575,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public ViewFragment select(Function<View, Iterable<SSTableReader>> filter)
     {
         View view = data.getView();
-        List<SSTableReader> sstables = Lists.newArrayList(filter.apply(view));
+        List<SSTableReader> sstables = Lists.newArrayList(Objects.requireNonNull(filter.apply(view)));
         return new ViewFragment(sstables, view.getAllMemtables());
     }
 
@@ -1578,19 +1605,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException
     {
         SamplerResult<ByteBuffer> samplerResults = metric.samplers.get(Sampler.valueOf(sampler))
-                .finishSampling(count);
+                                                                  .finishSampling(count);
         TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE);
         for (Counter<ByteBuffer> counter : samplerResults.topK)
         {
             byte[] key = counter.getItem().array();
             result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] {
-                    Hex.bytesToHex(key), // raw
-                    counter.getCount(),  // count
-                    counter.getError(),  // error
-                    metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string
+            Hex.bytesToHex(key), // raw
+            counter.getCount(),  // count
+            counter.getError(),  // error
+            metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string
         }
         return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{
-                samplerResults.cardinality, result});
+        samplerResults.cardinality, result});
     }
 
     public boolean isCompactionDiskSpaceCheckEnabled()
@@ -1929,20 +1956,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount);
     }
 
-    public void forceMajorCompaction() throws InterruptedException, ExecutionException
+    public void forceMajorCompaction()
     {
         forceMajorCompaction(false);
     }
 
 
-    public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException
+    public void forceMajorCompaction(boolean splitOutput)
     {
         CompactionManager.instance.performMaximal(this, splitOutput);
     }
 
     public static Iterable<ColumnFamilyStore> all()
     {
-        List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size());
+        List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
         for (Keyspace keyspace : Keyspace.all())
         {
             stores.add(keyspace.getColumnFamilyStores());
@@ -1985,13 +2012,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         for (final ColumnFamilyStore cfs : concatWithIndexes())
         {
-            cfs.runWithCompactionsDisabled(new Callable<Void>()
-            {
-                public Void call()
-                {
-                    cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs));
-                    return null;
-                }
+            cfs.runWithCompactionsDisabled((Callable<Void>) () -> {
+                cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs));
+                return null;
             }, true, false);
         }
     }
@@ -2044,25 +2067,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 now = Math.max(now, sstable.maxDataAge);
         truncatedAt = now;
 
-        Runnable truncateRunnable = new Runnable()
-        {
-            public void run()
-            {
-                logger.debug("Discarding sstable data for truncated CF + indexes");
-                data.notifyTruncated(truncatedAt);
+        Runnable truncateRunnable = () -> {
+            logger.debug("Discarding sstable data for truncated CF + indexes");
+            data.notifyTruncated(truncatedAt);
 
-                if (DatabaseDescriptor.isAutoSnapshot())
-                    snapshot(Keyspace.getTimestampedSnapshotName(name));
+            if (DatabaseDescriptor.isAutoSnapshot())
+                snapshot(Keyspace.getTimestampedSnapshotName(name));
 
-                discardSSTables(truncatedAt);
+            discardSSTables(truncatedAt);
 
-                indexManager.truncateAllIndexesBlocking(truncatedAt);
-                viewManager.truncateBlocking(replayAfter, truncatedAt);
+            indexManager.truncateAllIndexesBlocking(truncatedAt);
+            viewManager.truncateBlocking(replayAfter, truncatedAt);
 
-                SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
-                logger.trace("cleaning out row cache");
-                invalidateCaches();
-            }
+            SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
+            logger.trace("cleaning out row cache");
+            invalidateCaches();
         };
 
         runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
@@ -2163,17 +2182,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public LifecycleTransaction markAllCompacting(final OperationType operationType)
     {
-        Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
-        {
-            public LifecycleTransaction call() throws Exception
-            {
-                assert data.getCompacting().isEmpty() : data.getCompacting();
-                Iterable<SSTableReader> sstables = getLiveSSTables();
-                sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
-                LifecycleTransaction modifier = data.tryModify(sstables, operationType);
-                assert modifier != null: "something marked things compacting while compactions are disabled";
-                return modifier;
-            }
+        Callable<LifecycleTransaction> callable = () -> {
+            assert data.getCompacting().isEmpty() : data.getCompacting();
+            Iterable<SSTableReader> sstables = getLiveSSTables();
+            sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
+            LifecycleTransaction modifier = data.tryModify(sstables, operationType);
+            assert modifier != null: "something marked things compacting while compactions are disabled";
+            return modifier;
         };
 
         return runWithCompactionsDisabled(callable, false, false);
@@ -2296,7 +2311,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         if (maxThreshold == 0 || minThreshold == 0)
             throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " +
-                    "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'.");
+                                       "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'.");
     }
 
     // End JMX get/set.
@@ -2360,7 +2375,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public List<String> getBuiltIndexes()
     {
-       return indexManager.getBuiltIndexNames();
+        return indexManager.getBuiltIndexNames();
     }
 
     public int getUnleveledSSTables()
@@ -2517,6 +2532,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public static TableMetrics metricsFor(UUID tableId)
     {
-        return getIfExists(tableId).metric;
+        return Objects.requireNonNull(getIfExists(tableId)).metric;
     }
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 041ac2e..139663e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -339,7 +339,7 @@ public class Memtable implements Comparable<Memtable>
     @VisibleForTesting
     public void makeUnflushable()
     {
-        liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+        liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024);
     }
 
     private long estimatedSize()
@@ -433,7 +433,7 @@ public class Memtable implements Comparable<Memtable>
 
             return new SSTableTxnWriter(txn,
                                         cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
-                                                                     (long) partitions.size(),
+                                                                     partitions.size(),
                                                                      ActiveRepairService.UNREPAIRED_SSTABLE,
                                                                      sstableMetadataCollector,
                                                                      new SerializationHeader(true, cfs.metadata, columns, stats),
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index e135ebb..925708e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.utils.Pair;
 public abstract class ReadCommand implements ReadQuery
 {
     protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
-
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 
     // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
@@ -546,7 +545,7 @@ public abstract class ReadCommand implements ReadQuery
 
                 Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
             }
-        };
+        }
 
         return Transformation.apply(iter, new MetricRecording());
     }
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index d3633fd..b4418be 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -938,6 +938,18 @@ public class FBUtilities
         }
     }
 
+    public static void sleepQuietly(long millis)
+    {
+        try
+        {
+            Thread.sleep(millis);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     @VisibleForTesting
     protected static void reset()
     {
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 57242c4..1b698c9 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class HeapPool extends MemtablePool
 {
-    public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
+    public HeapPool(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
     {
         super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
     }
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 8383ddc..e5a97c6 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -18,21 +18,27 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
 public abstract class MemtableAllocator
 {
+    private static final Logger logger = LoggerFactory.getLogger(MemtableAllocator.class);
+    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5, TimeUnit.SECONDS);
+
     private final SubAllocator onHeap;
     private final SubAllocator offHeap;
-    volatile LifeCycle state = LifeCycle.LIVE;
 
-    static enum LifeCycle
+    enum LifeCycle
     {
         LIVE, DISCARDING, DISCARDED;
         LifeCycle transition(LifeCycle targetState)
@@ -78,10 +84,8 @@ public abstract class MemtableAllocator
      */
     public void setDiscarding()
     {
-        state = state.transition(LifeCycle.DISCARDING);
-        // mark the memory owned by this allocator as reclaiming
-        onHeap.markAllReclaiming();
-        offHeap.markAllReclaiming();
+        onHeap.setDiscarding();
+        offHeap.setDiscarding();
     }
 
     /**
@@ -90,15 +94,13 @@ public abstract class MemtableAllocator
      */
     public void setDiscarded()
     {
-        state = state.transition(LifeCycle.DISCARDED);
-        // release any memory owned by this allocator; automatically signals waiters
-        onHeap.releaseAll();
-        offHeap.releaseAll();
+        onHeap.setDiscarded();
+        offHeap.setDiscarded();
     }
 
     public boolean isLive()
     {
-        return state == LifeCycle.LIVE;
+        return onHeap.state == LifeCycle.LIVE || offHeap.state == LifeCycle.LIVE;
     }
 
     /** Mark the BB as unused, permitting it to be reclaimed */
@@ -107,6 +109,9 @@ public abstract class MemtableAllocator
         // the tracker we are owning memory from
         private final MemtablePool.SubPool parent;
 
+        // the state of the memtable
+        private volatile LifeCycle state;
+
         // the amount of memory/resource owned by this object
         private volatile long owns;
         // the amount of memory we are reporting to collect; this may be inaccurate, but is close
@@ -116,17 +121,44 @@ public abstract class MemtableAllocator
         SubAllocator(MemtablePool.SubPool parent)
         {
             this.parent = parent;
+            this.state = LifeCycle.LIVE;
+        }
+
+        /**
+         * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
+         * overshoot the maximum memory limit so that flushing can begin immediately
+         */
+        void setDiscarding()
+        {
+            state = state.transition(LifeCycle.DISCARDING);
+            // mark the memory owned by this allocator as reclaiming
+            updateReclaiming();
         }
 
-        // should only be called once we know we will never allocate to the object again.
-        // currently no corroboration/enforcement of this is performed.
+        /**
+         * Indicate the memory and resources owned by this allocator are no longer referenced,
+         * and can be reclaimed/reused.
+         */
+        void setDiscarded()
+        {
+            state = state.transition(LifeCycle.DISCARDED);
+            // release any memory owned by this allocator; automatically signals waiters
+            releaseAll();
+        }
+
+        /**
+         * Should only be called once we know we will never allocate to the object again.
+         * currently no corroboration/enforcement of this is performed.
+         */
         void releaseAll()
         {
             parent.released(ownsUpdater.getAndSet(this, 0));
             parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
         }
 
-        // like allocate, but permits allocations to be negative
+        /**
+         * Like allocate, but permits allocations to be negative.
+         */
         public void adjust(long size, OpOrder.Group opGroup)
         {
             if (size <= 0)
@@ -168,28 +200,71 @@ public abstract class MemtableAllocator
             }
         }
 
-        // retroactively mark an amount allocated and acquired in the tracker, and owned by us
+        /**
+         * Retroactively mark an amount allocated and acquired in the tracker, and owned by us. If the state is discarding,
+         * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
+         * and it will flush this memory too.
+         */
         private void allocated(long size)
         {
             parent.allocated(size);
             ownsUpdater.addAndGet(this, size);
+
+            if (state == LifeCycle.DISCARDING)
+            {
+                noSpamLogger.info("Allocated {} bytes whilst discarding", size);
+                updateReclaiming();
+            }
         }
 
-        // retroactively mark an amount acquired in the tracker, and owned by us
+        /**
+         * Retroactively mark an amount acquired in the tracker, and owned by us. If the state is discarding,
+         * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
+         * and it will flush this memory too.
+         */
         private void acquired(long size)
         {
-            parent.acquired(size);
+            parent.acquired();
             ownsUpdater.addAndGet(this, size);
+
+            if (state == LifeCycle.DISCARDING)
+            {
+                noSpamLogger.info("Acquired {} bytes whilst discarding", size);
+                updateReclaiming();
+            }
         }
 
+        /**
+         * If the state is still live, then we update the memory we own here and in the parent.
+         *
+         * However, if the state is not live, we do not update it because we would have to update
+         * reclaiming too, and it could cause problems to the memtable cleaner algorithm if reclaiming
+         * decreased. If the memtable is flushing, soon enough {@link this#releaseAll()} will be called.
+         *
+         * @param size the size that was released
+         */
         void released(long size)
         {
-            parent.released(size);
-            ownsUpdater.addAndGet(this, -size);
+            if (state == LifeCycle.LIVE)
+            {
+                parent.released(size);
+                ownsUpdater.addAndGet(this, -size);
+            }
+            else
+            {
+                noSpamLogger.info("Tried to release {} bytes whilst discarding", size);
+            }
         }
 
-        // mark everything we currently own as reclaiming, both here and in our parent
-        void markAllReclaiming()
+        /**
+         * Mark what we currently own as reclaiming, both here and in our parent.
+         * This method is called for the first time when the memtable is scheduled for flushing,
+         * in which case reclaiming will be zero and we mark everything that we own as reclaiming.
+         * Afterwards, if there are in flight writes that have not completed yet, we also mark any
+         * more memory that is allocated by these writes as reclaiming, since the memtable is waiting
+         * on the barrier for these writes to complete, before it can actually start flushing data.
+         */
+        void updateReclaiming()
         {
             while (true)
             {
@@ -208,6 +283,11 @@ public abstract class MemtableAllocator
             return owns;
         }
 
+        public long getReclaiming()
+        {
+            return reclaiming;
+        }
+
         public float ownershipRatio()
         {
             float r = owns / (float) parent.limit;
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java
new file mode 100644
index 0000000..d2cb9c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The cleaner is used by {@link MemtableCleanerThread} in order to reclaim space from memtables, normally
+ * by flushing the largest memtable.
+ */
+public interface MemtableCleaner
+{
+    /**
+     * This is a function that schedules a cleaning task, normally flushing of the largest sstable.
+     * The future will complete once the operation has completed and it will have a value set to true if
+     * the cleaner was able to execute the cleaning operation or if another thread concurrently executed
+     * the same clean operation. If no operation was even attempted, for example because no memtable was
+     * found, then the value will be false.
+     *
+     * The future will complete with an error if the cleaning operation encounters an error.
+     *
+     */
+    CompletableFuture<Boolean> clean();
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index b905d2c..f6fccc6 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -18,6 +18,12 @@
  */
 package org.apache.cassandra.utils.memory;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
@@ -27,54 +33,84 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
  */
 public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopExecutor
 {
-    private static class Clean<P extends MemtablePool> implements InterruptibleRunnable
+    private static final Logger logger = LoggerFactory.getLogger(MemtableCleanerThread.class);
+
+    public static class Clean<P extends MemtablePool> implements InterruptibleRunnable
     {
+        /** This is incremented when a cleaner is invoked and decremented when a cleaner has completed */
+        final AtomicInteger numPendingTasks = new AtomicInteger(0);
+
         /** The pool we're cleaning */
         final P pool;
 
         /** should ensure that at least some memory has been marked reclaiming after completion */
-        final Runnable cleaner;
+        final MemtableCleaner cleaner;
 
         /** signalled whenever needsCleaning() may return true */
         final WaitQueue wait = new WaitQueue();
 
-        private Clean(P pool, Runnable cleaner)
+        private Clean(P pool, MemtableCleaner cleaner)
         {
             this.pool = pool;
             this.cleaner = cleaner;
         }
 
-        boolean needsCleaning()
+        /** Return the number of pending tasks */
+        public int numPendingTasks()
         {
-            return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
+            return numPendingTasks.get();
         }
 
         @Override
         public void run() throws InterruptedException
         {
-            if (needsCleaning())
-            {
-                cleaner.run();
-            }
-            else
+            if (!pool.needsCleaning())
             {
                 final WaitQueue.Signal signal = wait.register();
-                if (!needsCleaning())
+                if (!pool.needsCleaning())
                     signal.await();
                 else
                     signal.cancel();
             }
+            else
+            {
+                int numPendingTasks = this.numPendingTasks.incrementAndGet();
+
+                if (logger.isTraceEnabled())
+                    logger.trace("Invoking cleaner with {} tasks pending", numPendingTasks);
+
+                cleaner.clean().handle(this::apply);
+            }
+        }
+
+        private Boolean apply(Boolean res, Throwable err)
+        {
+            final int tasks = numPendingTasks.decrementAndGet();
+
+            // if the cleaning job was scheduled (res == true) or had an error, trigger again after decrementing the tasks
+            if ((res || err != null) && pool.needsCleaning())
+                wait.signal();
+
+            if (err != null)
+                logger.error("Memtable cleaning tasks failed with an exception and {} pending tasks ", tasks, err);
+            else if (logger.isTraceEnabled())
+                logger.trace("Memtable cleaning task completed ({}), currently pending: {}", res, tasks);
+
+            return res;
         }
     }
 
     private final Runnable trigger;
+    private final Clean<P> clean;
+
     private MemtableCleanerThread(Clean<P> clean)
     {
         super(clean.pool.getClass().getSimpleName() + "Cleaner", clean);
         this.trigger = clean.wait::signal;
+        this.clean = clean;
     }
 
-    MemtableCleanerThread(P pool, Runnable cleaner)
+    public MemtableCleanerThread(P pool, MemtableCleaner cleaner)
     {
         this(new Clean<>(pool, cleaner));
     }
@@ -84,4 +120,11 @@ public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopE
     {
         trigger.run();
     }
+
+    /** Return the number of pending tasks */
+    @VisibleForTesting
+    public int numPendingTasks()
+    {
+        return clean.numPendingTasks();
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 8061566..cd434c5 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -23,7 +23,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
+import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -44,18 +46,22 @@ public abstract class MemtablePool
     public final SubPool offHeap;
 
     public final Timer blockedOnAllocating;
+    public final Gauge<Long> numPendingTasks;
 
     final WaitQueue hasRoom = new WaitQueue();
 
-    MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
+    MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner)
     {
+        Preconditions.checkArgument(cleaner != null, "Cleaner should not be null");
+
         this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold);
         this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold);
         this.cleaner = getCleaner(cleaner);
-        blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(new DefaultNameFactory("MemtablePool")
-                                                                         .createMetricName("BlockedOnAllocation"));
-        if (this.cleaner != null)
-            this.cleaner.start();
+        this.cleaner.start();
+        DefaultNameFactory nameFactory = new DefaultNameFactory("MemtablePool");
+        blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(nameFactory.createMetricName("BlockedOnAllocation"));
+        numPendingTasks = CassandraMetricsRegistry.Metrics.register(nameFactory.createMetricName("PendingFlushTasks"),
+                                                                    () -> (long) this.cleaner.numPendingTasks());
     }
 
     SubPool getSubPool(long limit, float cleanThreshold)
@@ -63,7 +69,7 @@ public abstract class MemtablePool
         return new SubPool(limit, cleanThreshold);
     }
 
-    MemtableCleanerThread<?> getCleaner(Runnable cleaner)
+    MemtableCleanerThread<?> getCleaner(MemtableCleaner cleaner)
     {
         return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
     }
@@ -78,6 +84,16 @@ public abstract class MemtablePool
 
     public abstract MemtableAllocator newAllocator();
 
+    public boolean needsCleaning()
+    {
+        return onHeap.needsCleaning() || offHeap.needsCleaning();
+    }
+
+    public Long getNumPendingtasks()
+    {
+        return numPendingTasks.getValue();
+    }
+
     /**
      * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
      * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
@@ -169,7 +185,7 @@ public abstract class MemtablePool
             maybeClean();
         }
 
-        void acquired(long size)
+        void acquired()
         {
             maybeClean();
         }
@@ -203,6 +219,11 @@ public abstract class MemtablePool
             return allocated;
         }
 
+        public long getReclaiming()
+        {
+            return reclaiming;
+        }
+
         public float reclaimingRatio()
         {
             float r = reclaiming / (float) limit;
diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java
index 012867a..29ea8fb 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils.memory;
 
 public class NativePool extends MemtablePool
 {
-    public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
+    public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner)
     {
         super(maxOnHeapMemory, maxOffHeapMemory, cleanThreshold, cleaner);
     }
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
index c5c44e1..a779432 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -22,7 +22,7 @@ public class SlabPool extends MemtablePool
 {
     final boolean allocateOnHeap;
 
-    public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner)
+    public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
     {
         super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner);
         this.allocateOnHeap = maxOffHeapMemory == 0;
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index e7216b2..ddeb9da 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -26,7 +26,6 @@ import java.net.ServerSocket;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -471,16 +470,9 @@ public abstract class CQLTester
 
     public void compact()
     {
-        try
-        {
-            ColumnFamilyStore store = getCurrentColumnFamilyStore();
-            if (store != null)
-                store.forceMajorCompaction();
-        }
-        catch (InterruptedException | ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+         ColumnFamilyStore store = getCurrentColumnFamilyStore();
+         if (store != null)
+             store.forceMajorCompaction();
     }
 
     public void disableCompaction()
diff --git a/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java
new file mode 100644
index 0000000..7100a2a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class MemtableCleanerThreadTest
+{
+    private static final long TIMEOUT_SECONDS = 5;
+    private static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS);
+
+    @Mock
+    private MemtablePool pool;
+
+    @Mock
+    private MemtableCleaner cleaner;
+
+    private MemtableCleanerThread<MemtablePool> cleanerThread;
+
+    @Before
+    public void setup()
+    {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    private void startThread()
+    {
+        cleanerThread = new MemtableCleanerThread<>(pool, cleaner);
+        assertNotNull(cleanerThread);
+        cleanerThread.start();
+
+        for (int i = 0; i < TIMEOUT_MILLIS && !cleanerThread.isAlive(); i++)
+            FBUtilities.sleepQuietly(1);
+    }
+
+    private void stopThread() throws InterruptedException
+    {
+        cleanerThread.shutdownNow();
+
+        assertTrue(cleanerThread.awaitTermination(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+    }
+
+    private void waitForPendingTasks()
+    {
+        // wait for a bit because the cleaner latch completes before the pending tasks are decremented
+        FBUtilities.sleepQuietly(TIMEOUT_MILLIS);
+
+        assertEquals(0, cleanerThread.numPendingTasks());
+    }
+
+    @Test
+    public void testCleanerInvoked() throws Exception
+    {
+        CountDownLatch cleanerExecutedLatch = new CountDownLatch(1);
+        CompletableFuture<Boolean> fut = new CompletableFuture<>();
+        AtomicBoolean needsCleaning = new AtomicBoolean(false);
+
+        when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get());
+
+        when(cleaner.clean()).thenAnswer(invocation -> {
+            needsCleaning.set(false);
+            cleanerExecutedLatch.countDown();
+            return fut;
+        });
+
+        // start the thread with needsCleaning returning false, the cleaner should not be invoked
+        needsCleaning.set(false);
+        startThread();
+        assertFalse(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(1, cleanerExecutedLatch.getCount());
+        assertEquals(0, cleanerThread.numPendingTasks());
+
+        // now invoke the cleaner
+        needsCleaning.set(true);
+        cleanerThread.trigger();
+        assertTrue(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(0, cleanerExecutedLatch.getCount());
+        assertEquals(1, cleanerThread.numPendingTasks());
+
+        // now complete the cleaning task
+        needsCleaning.set(false);
+        fut.complete(true);
+        waitForPendingTasks();
+
+        stopThread();
+    }
+
+    @Test
+    public void testCleanerError() throws Exception
+    {
+        AtomicReference<CountDownLatch> cleanerLatch = new AtomicReference<>(new CountDownLatch(1));
+        AtomicReference<CompletableFuture<Boolean>> fut = new AtomicReference<>(new CompletableFuture<>());
+        AtomicBoolean needsCleaning = new AtomicBoolean(false);
+        AtomicInteger numTimeCleanerInvoked = new AtomicInteger(0);
+
+        when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get());
+
+        when(cleaner.clean()).thenAnswer(invocation -> {
+            needsCleaning.set(false);
+            numTimeCleanerInvoked.incrementAndGet();
+            cleanerLatch.get().countDown();
+            return fut.get();
+        });
+
+        // start the thread with needsCleaning returning true, the cleaner should be invoked
+        needsCleaning.set(true);
+        startThread();
+        assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(0, cleanerLatch.get().getCount());
+        assertEquals(1, cleanerThread.numPendingTasks());
+        assertEquals(1, numTimeCleanerInvoked.get());
+
+        // complete the cleaning task with an error, no other cleaning task should be invoked
+        cleanerLatch.set(new CountDownLatch(1));
+        CompletableFuture<Boolean> oldFut = fut.get();
+        fut.set(new CompletableFuture<>());
+        needsCleaning.set(false);
+        oldFut.completeExceptionally(new RuntimeException("Test"));
+        assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(1, cleanerLatch.get().getCount());
+        assertEquals(1, numTimeCleanerInvoked.get());
+
+        // now trigger cleaning again and verify that a new task is invoked
+        cleanerLatch.set(new CountDownLatch(1));
+        fut.set(new CompletableFuture<>());
+        needsCleaning.set(true);
+        cleanerThread.trigger();
+        assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(0, cleanerLatch.get().getCount());
+        assertEquals(2, numTimeCleanerInvoked.get());
+
+        //  complete the cleaning task with false (nothing should be scheduled)
+        cleanerLatch.set(new CountDownLatch(1));
+        oldFut = fut.get();
+        fut.set(new CompletableFuture<>());
+        needsCleaning.set(false);
+        oldFut.complete(false);
+        assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(1, cleanerLatch.get().getCount());
+        assertEquals(2, numTimeCleanerInvoked.get());
+
+        // now trigger cleaning again and verify that a new task is invoked
+        cleanerLatch.set(new CountDownLatch(1));
+        fut.set(new CompletableFuture<>());
+        needsCleaning.set(true);
+        cleanerThread.trigger();
+        assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+        assertEquals(0, cleanerLatch.get().getCount());
+        assertEquals(3, numTimeCleanerInvoked.get());
+
+        stopThread();
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
index b636bf7..a7b112c 100644
--- a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
@@ -22,105 +22,137 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import junit.framework.Assert;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class NativeAllocatorTest
 {
+    private ScheduledExecutorService exec;
+    private OpOrder order;
+    private OpOrder.Group group;
+    private CountDownLatch canClean;
+    private CountDownLatch isClean;
+    private AtomicReference<NativeAllocator> allocatorRef;
+    private AtomicReference<OpOrder.Barrier> barrier;
+    private NativePool pool;
+    private NativeAllocator allocator;
+    private Runnable markBlocking;
+
+    @Before
+    public void setUp()
+    {
+        exec = Executors.newScheduledThreadPool(2);
+        order = new OpOrder();
+        group = order.start();
+        canClean = new CountDownLatch(1);
+        isClean = new CountDownLatch(1);
+        allocatorRef = new AtomicReference<>();
+        barrier = new AtomicReference<>();
+        pool = new NativePool(1, 100, 0.75f, () -> {
+            try
+            {
+                canClean.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError();
+            }
+            if (isClean.getCount() > 0)
+            {
+                allocatorRef.get().offHeap().released(80);
+                isClean.countDown();
+            }
+            return CompletableFuture.completedFuture(true);
+        });
+        allocator = new NativeAllocator(pool);
+        allocatorRef.set(allocator);
+        markBlocking = () -> {
+            barrier.set(order.newBarrier());
+            barrier.get().issue();
+            barrier.get().markBlocking();
+        };
+    }
+
+    private void verifyUsedReclaiming(long used, long reclaiming)
+    {
+        Assert.assertEquals(used, allocator.offHeap().owns());
+        Assert.assertEquals(used, pool.offHeap.used());
+        Assert.assertEquals(reclaiming, allocator.offHeap().getReclaiming());
+        Assert.assertEquals(reclaiming, pool.offHeap.getReclaiming());
+    }
 
     @Test
     public void testBookKeeping() throws ExecutionException, InterruptedException
     {
-        {
-            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
-            final OpOrder order = new OpOrder();
-            final OpOrder.Group group = order.start();
-            final CountDownLatch canClean = new CountDownLatch(1);
-            final CountDownLatch isClean = new CountDownLatch(1);
-            final AtomicReference<NativeAllocator> allocatorRef = new AtomicReference<>();
-            final AtomicReference<OpOrder.Barrier> barrier = new AtomicReference<>();
-            final NativeAllocator allocator = new NativeAllocator(new NativePool(1, 100, 0.75f, new Runnable()
+        final Runnable test = () -> {
+            // allocate normal, check accounted and not cleaned
+            allocator.allocate(10, group);
+            verifyUsedReclaiming(10, 0);
+
+            // confirm adjustment works
+            allocator.offHeap().adjust(-10, group);
+            verifyUsedReclaiming(0, 0);
+
+            allocator.offHeap().adjust(10, group);
+            verifyUsedReclaiming(10, 0);
+
+            // confirm we cannot allocate negative
+            boolean success = false;
+            try
             {
-                public void run()
-                {
-                    try
-                    {
-                        canClean.await();
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError();
-                    }
-                    if (isClean.getCount() > 0)
-                    {
-                        allocatorRef.get().offHeap().released(80);
-                        isClean.countDown();
-                    }
-                }
-            }));
-            allocatorRef.set(allocator);
-            final Runnable markBlocking = new Runnable()
+                allocator.offHeap().allocate(-10, group);
+            }
+            catch (AssertionError e)
             {
+                success = true;
+            }
+
+            Assert.assertTrue(success);
+            Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+            Assert.assertEquals(1, isClean.getCount());
+
+            // allocate above watermark
+            allocator.allocate(70, group);
+            verifyUsedReclaiming(80, 0);
 
-                public void run()
-                {
-                    barrier.set(order.newBarrier());
-                    barrier.get().issue();
-                    barrier.get().markBlocking();
-                }
-            };
-            final Runnable run = new Runnable()
+            // let the cleaner run, it will release 80 bytes
+            canClean.countDown();
+            try
             {
-                public void run()
-                {
-                    // allocate normal, check accounted and not cleaned
-                    allocator.allocate(10, group);
-                    Assert.assertEquals(10, allocator.offHeap().owns());
-                    // confirm adjustment works
-                    allocator.offHeap().adjust(-10, group);
-                    Assert.assertEquals(0, allocator.offHeap().owns());
-                    allocator.offHeap().adjust(10, group);
-                    Assert.assertEquals(10, allocator.offHeap().owns());
-                    // confirm we cannot allocate negative
-                    boolean success = false;
-                    try
-                    {
-                        allocator.offHeap().allocate(-10, group);
-                    }
-                    catch (AssertionError e)
-                    {
-                        success = true;
-                    }
-                    Assert.assertTrue(success);
-                    Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
-                    Assert.assertEquals(1, isClean.getCount());
-
-                    // allocate above watermark, check cleaned
-                    allocator.allocate(70, group);
-                    Assert.assertEquals(80, allocator.offHeap().owns());
-                    canClean.countDown();
-                    try
-                    {
-                        isClean.await(10L, TimeUnit.MILLISECONDS);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError();
-                    }
-                    Assert.assertEquals(0, isClean.getCount());
-                    Assert.assertEquals(0, allocator.offHeap().owns());
-
-                    // allocate above limit, check we block until "marked blocking"
-                    exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS);
-                    allocator.allocate(110, group);
-                    Assert.assertNotNull(barrier.get());
-                    Assert.assertEquals(110, allocator.offHeap().owns());
-                }
-            };
-            exec.submit(run).get();
-        }
-    }
+                isClean.await(10L, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError();
+            }
+            Assert.assertEquals(0, isClean.getCount());
+            verifyUsedReclaiming(0, 0);
 
+            // allocate, then set discarding, then allocated some more
+            allocator.allocate(30, group);
+            verifyUsedReclaiming(30, 0);
+            allocator.setDiscarding();
+            Assert.assertFalse(allocator.isLive());
+            verifyUsedReclaiming(30, 30);
+            allocator.allocate(50, group);
+            verifyUsedReclaiming(80, 80);
+
+            // allocate above limit, check we block until "marked blocking"
+            exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS);
+            allocator.allocate(30, group);
+            Assert.assertNotNull(barrier.get());
+            verifyUsedReclaiming(110, 110);
+
+            // release everything
+            allocator.setDiscarded();
+            Assert.assertFalse(allocator.isLive());
+            verifyUsedReclaiming(0, 0);
+        };
+        exec.submit(test).get();
+    }
 }
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org