You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ed...@apache.org on 2021/01/19 23:42:09 UTC
[cassandra] branch cassandra-3.0 updated: Prevent unbounded number
of pending flushing tasks;
Add PendingFlushTasks metric (CASSANDRA-16261) Authored by Ekaterina
Dimitrova;
reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261
This is an automated email from the ASF dual-hosted git repository.
edimitrova pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 0a1e900 Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261) Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261
0a1e900 is described below
commit 0a1e900a0a042f78d7d5d6625bc98b84eb463e69
Author: Ekaterina Dimitrova <ek...@datastax.com>
AuthorDate: Fri Nov 6 18:46:14 2020 -0500
Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261)
Authored by Ekaterina Dimitrova; reviewed by Caleb Rackliffe and Andres de la Pena for CASSANDRA-16261
---
CHANGES.txt | 1 +
.../cassandra/concurrent/InfiniteLoopExecutor.java | 8 +
.../cassandra/config/DatabaseDescriptor.java | 39 +--
.../org/apache/cassandra/db/ColumnFamilyStore.java | 373 +++++++++++----------
src/java/org/apache/cassandra/db/Memtable.java | 4 +-
src/java/org/apache/cassandra/db/ReadCommand.java | 3 +-
.../org/apache/cassandra/utils/FBUtilities.java | 12 +
.../apache/cassandra/utils/memory/HeapPool.java | 2 +-
.../cassandra/utils/memory/MemtableAllocator.java | 124 +++++--
.../cassandra/utils/memory/MemtableCleaner.java | 40 +++
.../utils/memory/MemtableCleanerThread.java | 67 +++-
.../cassandra/utils/memory/MemtablePool.java | 35 +-
.../apache/cassandra/utils/memory/NativePool.java | 2 +-
.../apache/cassandra/utils/memory/SlabPool.java | 2 +-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 14 +-
.../utils/memory/MemtableCleanerThreadTest.java | 187 +++++++++++
.../utils/memory/NativeAllocatorTest.java | 204 ++++++-----
17 files changed, 772 insertions(+), 345 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ee4cf6e..4f3ab1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.24:
+ * Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261)
* Improve empty hint file handling during startup (CASSANDRA-16162)
* Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
* Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226)
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
index b54fa3f..8e72d91 100644
--- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+
public class InfiniteLoopExecutor
{
private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
@@ -81,4 +83,10 @@ public class InfiniteLoopExecutor
thread.join(unit.toMillis(time));
return !thread.isAlive();
}
+
+ @VisibleForTesting
+ public boolean isAlive()
+ {
+ return this.thread.isAlive();
+ }
}
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3f9aa96..52f01b6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -151,7 +151,7 @@ public class DatabaseDescriptor
String loaderClass = System.getProperty("cassandra.config.loader");
ConfigurationLoader loader = loaderClass == null
? new YamlConfigurationLoader()
- : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
+ : FBUtilities.construct(loaderClass, "configuration loading");
Config config = loader.loadConfig();
if (!hasLoggedConfig)
@@ -214,7 +214,7 @@ public class DatabaseDescriptor
}
catch (UnknownHostException e)
{
- throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false);
+ throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false);
}
if (listenAddress.isAnyLocalAddress())
@@ -234,7 +234,7 @@ public class DatabaseDescriptor
}
catch (UnknownHostException e)
{
- throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false);
+ throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false);
}
if (broadcastAddress.isAnyLocalAddress())
@@ -275,7 +275,7 @@ public class DatabaseDescriptor
}
catch (UnknownHostException e)
{
- throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
+ throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false);
}
if (broadcastRpcAddress.isAnyLocalAddress())
@@ -520,18 +520,14 @@ public class DatabaseDescriptor
EndpointSnitchInfo.create();
localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
- localComparator = new Comparator<InetAddress>()
- {
- public int compare(InetAddress endpoint1, InetAddress endpoint2)
- {
- boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
- boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
- if (local1 && !local2)
- return -1;
- if (local2 && !local1)
- return 1;
- return 0;
- }
+ localComparator = (endpoint1, endpoint2) -> {
+ boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
+ boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
+ if (local1 && !local2)
+ return -1;
+ if (local2 && !local1)
+ return 1;
+ return 0;
};
/* Request Scheduler setup */
@@ -592,7 +588,7 @@ public class DatabaseDescriptor
if (conf.commitlog_total_space_in_mb == null)
{
int preferredSize = 8192;
- int minSize = 0;
+ int minSize;
try
{
// use 1/4 of available space. See discussion on #10013 and #10199
@@ -1061,7 +1057,7 @@ public class DatabaseDescriptor
public static Collection<String> tokensFromString(String tokenString)
{
- List<String> tokens = new ArrayList<String>();
+ List<String> tokens = new ArrayList<>();
if (tokenString != null)
for (String token : tokenString.split(","))
tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", ""));
@@ -1747,7 +1743,7 @@ public class DatabaseDescriptor
public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension)
{
String name = cacheType.toString()
- + (version == null ? "" : "-" + version + "." + extension);
+ + (version == null ? "" : '-' + version + '.' + extension);
return new File(conf.saved_caches_directory, name);
}
@@ -2026,12 +2022,13 @@ public class DatabaseDescriptor
{
long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
+ final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable;
switch (conf.memtable_allocation_type)
{
case unslabbed_heap_buffers:
- return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+ return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, cleaner);
case heap_buffers:
- return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+ return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, cleaner);
case offheap_buffers:
throw new ConfigurationException("offheap_buffers are not available in 3.0. They will be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a9c087e..61d60b1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.*;
+import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -126,41 +127,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
- private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemtableFlushWriter"),
- "internal");
+ private static final ThreadPoolExecutor flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("MemtableFlushWriter"),
+ "internal");
// post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
- private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemtablePostFlush"),
- "internal");
-
- private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
- StageManager.KEEPALIVE,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(),
- new NamedThreadFactory("MemtableReclaimMemory"),
- "internal");
+ private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("MemtablePostFlush"),
+ "internal");
+
+ private static final ThreadPoolExecutor reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new NamedThreadFactory("MemtableReclaimMemory"),
+ "internal");
private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"};
private static final String[] COUNTER_DESCS = new String[]
- { "partition key in raw hex bytes",
- "value of this partition for given sampler",
- "value is within the error bounds plus or minus of this",
- "the partition key turned into a human readable format" };
+ { "partition key in raw hex bytes",
+ "value of this partition for given sampler",
+ "value is within the error bounds plus or minus of this",
+ "the partition key turned into a human readable format" };
private static final CompositeType COUNTER_COMPOSITE_TYPE;
private static final TabularType COUNTER_TYPE;
private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"};
private static final String[] SAMPLER_DESCS = new String[]
- { "cardinality of partitions",
- "list of counter results" };
+ { "cardinality of partitions",
+ "list of counter results" };
private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS";
private static final CompositeType SAMPLING_RESULT;
@@ -234,7 +235,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
ExecutorUtils.shutdownAndWait(timeout, unit, reclaimExecutor, postFlushExecutor, flushExecutor);
}
-
public void reload()
{
// metadata object has been mutated directly. make all the members jibe with new settings.
@@ -271,7 +271,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
logger.trace("scheduling flush in {} ms", period);
WrappedRunnable runnable = new WrappedRunnable()
{
- protected void runMayThrow() throws Exception
+ protected void runMayThrow()
{
synchronized (data)
{
@@ -299,14 +299,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public static Runnable getBackgroundCompactionTaskSubmitter()
{
- return new Runnable()
- {
- public void run()
- {
- for (Keyspace keyspace : Keyspace.all())
- for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
- CompactionManager.instance.submitBackground(cfs);
- }
+ return () -> {
+ for (Keyspace keyspace : Keyspace.all())
+ for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
+ CompactionManager.instance.submitBackground(cfs);
};
}
@@ -360,11 +356,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
private ColumnFamilyStore(Keyspace keyspace,
- String columnFamilyName,
- int generation,
- CFMetaData metadata,
- Directories directories,
- boolean loadSSTables)
+ String columnFamilyName,
+ int generation,
+ CFMetaData metadata,
+ Directories directories,
+ boolean loadSSTables)
{
this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true);
}
@@ -372,15 +368,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@VisibleForTesting
public ColumnFamilyStore(Keyspace keyspace,
- String columnFamilyName,
- int generation,
- CFMetaData metadata,
- Directories directories,
- boolean loadSSTables,
- boolean registerBookkeeping)
+ String columnFamilyName,
+ int generation,
+ CFMetaData metadata,
+ Directories directories,
+ boolean loadSSTables,
+ boolean registerBookkeeping)
{
assert directories != null;
- assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName;
+ assert metadata != null : "null metadata for " + keyspace + ':' + columnFamilyName;
this.keyspace = keyspace;
this.metadata = metadata;
@@ -435,8 +431,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
// register the mbean
mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s",
- isIndex() ? "IndexTables" : "Tables",
- keyspace.getName(), name);
+ isIndex() ? "IndexTables" : "Tables",
+ keyspace.getName(), name);
oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s",
isIndex() ? "IndexColumnFamilies" : "ColumnFamilies",
keyspace.getName(), name);
@@ -453,24 +449,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new RuntimeException(e);
}
logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry);
- latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
- {
- public void run()
+ latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(() -> {
+ SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry;
+ switch (retryPolicy.kind())
{
- SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry;
- switch (retryPolicy.kind())
- {
- case PERCENTILE:
- // get percentile in nanos
- sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold()));
- break;
- case CUSTOM:
- sampleLatencyNanos = (long) retryPolicy.threshold();
- break;
- default:
- sampleLatencyNanos = Long.MAX_VALUE;
- break;
- }
+ case PERCENTILE:
+ // get percentile in nanos
+ sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold()));
+ break;
+ case CUSTOM:
+ sampleLatencyNanos = (long) retryPolicy.threshold();
+ break;
+ default:
+ sampleLatencyNanos = Long.MAX_VALUE;
+ break;
}
}, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
}
@@ -575,14 +567,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// get the max generation number, to prevent generation conflicts
Directories directories = new Directories(metadata, initialDirectories);
Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
- List<Integer> generations = new ArrayList<Integer>();
+ List<Integer> generations = new ArrayList<>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet())
{
Descriptor desc = entry.getKey();
generations.add(desc.generation);
if (!desc.isCompatible())
throw new RuntimeException(String.format("Incompatible SSTable found. Current version %s is unable to read file: %s. Please run upgradesstables.",
- desc.getFormat().getLatestVersion(), desc));
+ desc.getFormat().getLatestVersion(), desc));
}
Collections.sort(generations);
int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0;
@@ -599,7 +591,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Directories directories = new Directories(metadata, initialDirectories);
Set<File> cleanedDirectories = new HashSet<>();
- // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
+ // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
clearEphemeralSnapshots(directories);
directories.removeTemporaryDirectories();
@@ -637,13 +629,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
// cleanup incomplete saved caches
- Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + "-" + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$");
+ Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + '-' + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$");
File dir = new File(DatabaseDescriptor.getSavedCachesLocation());
if (dir.exists())
{
assert dir.isDirectory();
- for (File file : dir.listFiles())
+ for (File file : Objects.requireNonNull(dir.listFiles()))
if (tmpCacheFilePattern.matcher(file.getName()).matches())
if (!file.delete())
logger.warn("could not delete {}", file.getAbsolutePath());
@@ -666,7 +658,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public static void loadNewSSTables(String ksName, String cfName)
{
- /** ks/cf existence checks will be done by open and getCFS methods for us */
+ /* ks/cf existence checks will be done by open and getCFS methods for us */
Keyspace keyspace = Keyspace.open(ksName);
keyspace.getColumnFamilyStore(cfName).loadNewSSTables();
}
@@ -693,8 +685,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (!descriptor.isCompatible())
throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
- descriptor.getFormat().getLatestVersion(),
- descriptor));
+ descriptor.getFormat().getLatestVersion(),
+ descriptor));
// force foreign sstables to level 0
try
@@ -779,7 +771,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName);
- Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames));
+ Set<String> indexes = new HashSet<>(Arrays.asList(idxNames));
Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL);
try (Refs<SSTableReader> refs = Refs.ref(sstables))
@@ -834,6 +826,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (data.getView().getCurrentMemtable() == memtable)
return switchMemtable();
}
+ logger.debug("Memtable is no longer current, returning future that completes when current flushing operation completes");
return waitForFlushes();
}
@@ -879,7 +872,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
logger.debug("Enqueuing flush of {}: {}", name, String.format("%d (%.0f%%) on-heap, %d (%.0f%%) off-heap",
- onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100));
+ onHeapTotal, onHeapRatio * 100, offHeapTotal, offHeapRatio * 100));
}
@@ -927,13 +920,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// we grab the current memtable; once any preceding memtables have flushed, we know its
// commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable)
final Memtable current = data.getView().getCurrentMemtable();
- ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(new Callable<ReplayPosition>()
- {
- public ReplayPosition call()
- {
- logger.debug("forceFlush requested but everything is clean in {}", name);
- return current.getCommitLogLowerBound();
- }
+ ListenableFutureTask<ReplayPosition> task = ListenableFutureTask.create(() -> {
+ logger.debug("forceFlush requested but everything is clean in {}", name);
+ return current.getCommitLogLowerBound();
});
postFlushExecutor.execute(task);
return task;
@@ -1007,11 +996,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private Flush(boolean truncate)
{
+ if (logger.isTraceEnabled())
+ logger.trace("Creating flush task {}@{}", hashCode(), name);
// if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard
this.truncate = truncate;
metric.pendingFlushes.inc();
- /**
+ /*
* To ensure correctness of switch without blocking writes, run() needs to wait for all write operations
* started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering
* that all write operations register themselves with, and assigning this barrier to the memtables,
@@ -1020,7 +1011,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so
* that we know the CL position we are dirty to, which can be marked clean when we complete.
*/
- writeBarrier = keyspace.writeOrder.newBarrier();
+ writeBarrier = Keyspace.writeOrder.newBarrier();
// submit flushes for the memtable for any indexed sub-cfses, and our own
AtomicReference<ReplayPosition> commitLogUpperBound = new AtomicReference<>();
@@ -1044,15 +1035,26 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// replay positions have also completed, i.e. the memtables are done and ready to flush
writeBarrier.issue();
postFlush = new PostFlush(memtables);
+
+ if (logger.isTraceEnabled())
+ logger.trace("Created flush task {}@{}", hashCode(), name);
}
public void run()
{
+ if (logger.isTraceEnabled())
+ logger.trace("Flush task {}@{} starts executing, waiting on barrier", hashCode(), name);
+
+ long start = System.nanoTime();
+
// mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit
// if they are stuck waiting on it, then wait for them all to complete
writeBarrier.markBlocking();
writeBarrier.await();
+ if (logger.isTraceEnabled())
+ logger.trace("Flush task for task {}@{} waited {} ms at the barrier", hashCode(), name, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
// mark all memtables as flushing, removing them from the live memtable list
for (Memtable memtable : memtables)
memtable.cfs.data.markFlushing(memtable);
@@ -1088,8 +1090,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
finally
{
+ if (logger.isTraceEnabled())
+ logger.trace("Flush task {}@{} signaling post flush task", hashCode(), name);
+
// signal the post-flush we've done our work
postFlush.latch.countDown();
+
+ if (logger.isTraceEnabled())
+ logger.trace("Flush task task {}@{} finished", hashCode(), name);
}
}
@@ -1100,7 +1108,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
readBarrier.issue();
reclaimExecutor.execute(new WrappedRunnable()
{
- public void runMayThrow() throws InterruptedException, ExecutionException
+ public void runMayThrow()
{
readBarrier.await();
memtable.setDiscarded();
@@ -1130,58 +1138,77 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
* queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
*/
- public static class FlushLargestColumnFamily implements Runnable
+ public static CompletableFuture<Boolean> flushLargestMemtable()
{
- public void run()
+ float largestRatio = 0f;
+ Memtable largest = null;
+ float liveOnHeap = 0, liveOffHeap = 0;
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- float largestRatio = 0f;
- Memtable largest = null;
- float liveOnHeap = 0, liveOffHeap = 0;
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
- // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
- // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
- Memtable current = cfs.getTracker().getView().getCurrentMemtable();
-
- // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
- // both on- and off-heap, and select the largest of the two ratios to weight this CF
- float onHeap = 0f, offHeap = 0f;
- onHeap += current.getAllocator().onHeap().ownershipRatio();
- offHeap += current.getAllocator().offHeap().ownershipRatio();
-
- for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores())
- {
- MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
- onHeap += allocator.onHeap().ownershipRatio();
- offHeap += allocator.offHeap().ownershipRatio();
- }
+ // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
+ // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
+ // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
+ Memtable current = cfs.getTracker().getView().getCurrentMemtable();
- float ratio = Math.max(onHeap, offHeap);
- if (ratio > largestRatio)
- {
- largest = current;
- largestRatio = ratio;
- }
+ // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
+ // both on- and off-heap, and select the largest of the two ratios to weight this CF
+ float onHeap = 0f, offHeap = 0f;
+ onHeap += current.getAllocator().onHeap().ownershipRatio();
+ offHeap += current.getAllocator().offHeap().ownershipRatio();
- liveOnHeap += onHeap;
- liveOffHeap += offHeap;
+ for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores())
+ {
+ MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
+ onHeap += allocator.onHeap().ownershipRatio();
+ offHeap += allocator.offHeap().ownershipRatio();
}
- if (largest != null)
+ float ratio = Math.max(onHeap, offHeap);
+ if (ratio > largestRatio)
{
- float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
- float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
- float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
- float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
- float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio();
- float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio();
- logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}",
- largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap),
- ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap));
- largest.cfs.switchMemtableIfCurrent(largest);
+ largest = current;
+ largestRatio = ratio;
}
+
+ liveOnHeap += onHeap;
+ liveOffHeap += offHeap;
+ }
+
+ CompletableFuture<Boolean> returnFuture = new CompletableFuture<>();
+
+ if (largest != null)
+ {
+ float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
+ float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
+ float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
+ float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
+ float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio();
+ float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio();
+ logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}",
+ largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap),
+ ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap));
+
+ ListenableFuture<ReplayPosition> flushFuture = largest.cfs.switchMemtableIfCurrent(largest);
+ flushFuture.addListener(() -> {
+ try
+ {
+ flushFuture.get();
+ returnFuture.complete(true);
+ }
+ catch (Throwable t)
+ {
+ returnFuture.completeExceptionally(t);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ else
+ {
+ logger.debug("Flushing of largest memtable, not done, no memtable found");
+
+ returnFuture.complete(false);
}
+
+ return returnFuture;
}
private static String ratio(float onHeap, float offHeap)
@@ -1228,8 +1255,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
catch (RuntimeException e)
{
throw new RuntimeException(e.getMessage()
- + " for ks: "
- + keyspace.getName() + ", table: " + name, e);
+ + " for ks: "
+ + keyspace.getName() + ", table: " + name, e);
}
}
@@ -1510,7 +1537,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
nowInSec,
filter.selectsAllPartition(),
metadata.enforceStrictLiveness()))
- || filter.isFullyCoveredBy(cached);
+ || filter.isFullyCoveredBy(cached);
}
public int gcBefore(int nowInSec)
@@ -1548,7 +1575,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public ViewFragment select(Function<View, Iterable<SSTableReader>> filter)
{
View view = data.getView();
- List<SSTableReader> sstables = Lists.newArrayList(filter.apply(view));
+ List<SSTableReader> sstables = Lists.newArrayList(Objects.requireNonNull(filter.apply(view)));
return new ViewFragment(sstables, view.getAllMemtables());
}
@@ -1578,19 +1605,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException
{
SamplerResult<ByteBuffer> samplerResults = metric.samplers.get(Sampler.valueOf(sampler))
- .finishSampling(count);
+ .finishSampling(count);
TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE);
for (Counter<ByteBuffer> counter : samplerResults.topK)
{
byte[] key = counter.getItem().array();
result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] {
- Hex.bytesToHex(key), // raw
- counter.getCount(), // count
- counter.getError(), // error
- metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string
+ Hex.bytesToHex(key), // raw
+ counter.getCount(), // count
+ counter.getError(), // error
+ metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string
}
return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{
- samplerResults.cardinality, result});
+ samplerResults.cardinality, result});
}
public boolean isCompactionDiskSpaceCheckEnabled()
@@ -1929,20 +1956,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount);
}
- public void forceMajorCompaction() throws InterruptedException, ExecutionException
+ public void forceMajorCompaction()
{
forceMajorCompaction(false);
}
- public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException
+ public void forceMajorCompaction(boolean splitOutput)
{
CompactionManager.instance.performMaximal(this, splitOutput);
}
public static Iterable<ColumnFamilyStore> all()
{
- List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size());
+ List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
for (Keyspace keyspace : Keyspace.all())
{
stores.add(keyspace.getColumnFamilyStores());
@@ -1985,13 +2012,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
for (final ColumnFamilyStore cfs : concatWithIndexes())
{
- cfs.runWithCompactionsDisabled(new Callable<Void>()
- {
- public Void call()
- {
- cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs));
- return null;
- }
+ cfs.runWithCompactionsDisabled((Callable<Void>) () -> {
+ cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs));
+ return null;
}, true, false);
}
}
@@ -2044,25 +2067,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
now = Math.max(now, sstable.maxDataAge);
truncatedAt = now;
- Runnable truncateRunnable = new Runnable()
- {
- public void run()
- {
- logger.debug("Discarding sstable data for truncated CF + indexes");
- data.notifyTruncated(truncatedAt);
+ Runnable truncateRunnable = () -> {
+ logger.debug("Discarding sstable data for truncated CF + indexes");
+ data.notifyTruncated(truncatedAt);
- if (DatabaseDescriptor.isAutoSnapshot())
- snapshot(Keyspace.getTimestampedSnapshotName(name));
+ if (DatabaseDescriptor.isAutoSnapshot())
+ snapshot(Keyspace.getTimestampedSnapshotName(name));
- discardSSTables(truncatedAt);
+ discardSSTables(truncatedAt);
- indexManager.truncateAllIndexesBlocking(truncatedAt);
- viewManager.truncateBlocking(replayAfter, truncatedAt);
+ indexManager.truncateAllIndexesBlocking(truncatedAt);
+ viewManager.truncateBlocking(replayAfter, truncatedAt);
- SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
- logger.trace("cleaning out row cache");
- invalidateCaches();
- }
+ SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter);
+ logger.trace("cleaning out row cache");
+ invalidateCaches();
};
runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
@@ -2163,17 +2182,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public LifecycleTransaction markAllCompacting(final OperationType operationType)
{
- Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
- {
- public LifecycleTransaction call() throws Exception
- {
- assert data.getCompacting().isEmpty() : data.getCompacting();
- Iterable<SSTableReader> sstables = getLiveSSTables();
- sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
- LifecycleTransaction modifier = data.tryModify(sstables, operationType);
- assert modifier != null: "something marked things compacting while compactions are disabled";
- return modifier;
- }
+ Callable<LifecycleTransaction> callable = () -> {
+ assert data.getCompacting().isEmpty() : data.getCompacting();
+ Iterable<SSTableReader> sstables = getLiveSSTables();
+ sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables);
+ LifecycleTransaction modifier = data.tryModify(sstables, operationType);
+ assert modifier != null: "something marked things compacting while compactions are disabled";
+ return modifier;
};
return runWithCompactionsDisabled(callable, false, false);
@@ -2296,7 +2311,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
if (maxThreshold == 0 || minThreshold == 0)
throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " +
- "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'.");
+ "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'.");
}
// End JMX get/set.
@@ -2360,7 +2375,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public List<String> getBuiltIndexes()
{
- return indexManager.getBuiltIndexNames();
+ return indexManager.getBuiltIndexNames();
}
public int getUnleveledSSTables()
@@ -2517,6 +2532,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
public static TableMetrics metricsFor(UUID tableId)
{
- return getIfExists(tableId).metric;
+ return Objects.requireNonNull(getIfExists(tableId)).metric;
}
-}
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 041ac2e..139663e 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -339,7 +339,7 @@ public class Memtable implements Comparable<Memtable>
@VisibleForTesting
public void makeUnflushable()
{
- liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+ liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024);
}
private long estimatedSize()
@@ -433,7 +433,7 @@ public class Memtable implements Comparable<Memtable>
return new SSTableTxnWriter(txn,
cfs.createSSTableMultiWriter(Descriptor.fromFilename(filename),
- (long) partitions.size(),
+ partitions.size(),
ActiveRepairService.UNREPAIRED_SSTABLE,
sstableMetadataCollector,
new SerializationHeader(true, cfs.metadata, columns, stats),
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index e135ebb..925708e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -63,7 +63,6 @@ import org.apache.cassandra.utils.Pair;
public abstract class ReadCommand implements ReadQuery
{
protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class);
-
public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
// For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version.
@@ -546,7 +545,7 @@ public abstract class ReadCommand implements ReadQuery
Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
}
- };
+ }
return Transformation.apply(iter, new MetricRecording());
}
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index d3633fd..b4418be 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -938,6 +938,18 @@ public class FBUtilities
}
}
+ public static void sleepQuietly(long millis)
+ {
+ try
+ {
+ Thread.sleep(millis);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
@VisibleForTesting
protected static void reset()
{
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 57242c4..1b698c9 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -24,7 +24,7 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
public class HeapPool extends MemtablePool
{
- public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
+ public HeapPool(long maxOnHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
{
super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
}
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 8383ddc..e5a97c6 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -18,21 +18,27 @@
*/
package org.apache.cassandra.utils.memory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import org.apache.cassandra.config.CFMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.WaitQueue;
public abstract class MemtableAllocator
{
+ private static final Logger logger = LoggerFactory.getLogger(MemtableAllocator.class);
+ private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5, TimeUnit.SECONDS);
+
private final SubAllocator onHeap;
private final SubAllocator offHeap;
- volatile LifeCycle state = LifeCycle.LIVE;
- static enum LifeCycle
+ enum LifeCycle
{
LIVE, DISCARDING, DISCARDED;
LifeCycle transition(LifeCycle targetState)
@@ -78,10 +84,8 @@ public abstract class MemtableAllocator
*/
public void setDiscarding()
{
- state = state.transition(LifeCycle.DISCARDING);
- // mark the memory owned by this allocator as reclaiming
- onHeap.markAllReclaiming();
- offHeap.markAllReclaiming();
+ onHeap.setDiscarding();
+ offHeap.setDiscarding();
}
/**
@@ -90,15 +94,13 @@ public abstract class MemtableAllocator
*/
public void setDiscarded()
{
- state = state.transition(LifeCycle.DISCARDED);
- // release any memory owned by this allocator; automatically signals waiters
- onHeap.releaseAll();
- offHeap.releaseAll();
+ onHeap.setDiscarded();
+ offHeap.setDiscarded();
}
public boolean isLive()
{
- return state == LifeCycle.LIVE;
+ return onHeap.state == LifeCycle.LIVE || offHeap.state == LifeCycle.LIVE;
}
/** Mark the BB as unused, permitting it to be reclaimed */
@@ -107,6 +109,9 @@ public abstract class MemtableAllocator
// the tracker we are owning memory from
private final MemtablePool.SubPool parent;
+ // the state of the memtable
+ private volatile LifeCycle state;
+
// the amount of memory/resource owned by this object
private volatile long owns;
// the amount of memory we are reporting to collect; this may be inaccurate, but is close
@@ -116,17 +121,44 @@ public abstract class MemtableAllocator
SubAllocator(MemtablePool.SubPool parent)
{
this.parent = parent;
+ this.state = LifeCycle.LIVE;
+ }
+
+ /**
+ * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
+ * overshoot the maximum memory limit so that flushing can begin immediately
+ */
+ void setDiscarding()
+ {
+ state = state.transition(LifeCycle.DISCARDING);
+ // mark the memory owned by this allocator as reclaiming
+ updateReclaiming();
}
- // should only be called once we know we will never allocate to the object again.
- // currently no corroboration/enforcement of this is performed.
+ /**
+ * Indicate the memory and resources owned by this allocator are no longer referenced,
+ * and can be reclaimed/reused.
+ */
+ void setDiscarded()
+ {
+ state = state.transition(LifeCycle.DISCARDED);
+ // release any memory owned by this allocator; automatically signals waiters
+ releaseAll();
+ }
+
+ /**
+ * Should only be called once we know we will never allocate to the object again.
+ * currently no corroboration/enforcement of this is performed.
+ */
void releaseAll()
{
parent.released(ownsUpdater.getAndSet(this, 0));
parent.reclaimed(reclaimingUpdater.getAndSet(this, 0));
}
- // like allocate, but permits allocations to be negative
+ /**
+ * Like allocate, but permits allocations to be negative.
+ */
public void adjust(long size, OpOrder.Group opGroup)
{
if (size <= 0)
@@ -168,28 +200,71 @@ public abstract class MemtableAllocator
}
}
- // retroactively mark an amount allocated and acquired in the tracker, and owned by us
+ /**
+ * Retroactively mark an amount allocated and acquired in the tracker, and owned by us. If the state is discarding,
+ * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
+ * and it will flush this memory too.
+ */
private void allocated(long size)
{
parent.allocated(size);
ownsUpdater.addAndGet(this, size);
+
+ if (state == LifeCycle.DISCARDING)
+ {
+ noSpamLogger.info("Allocated {} bytes whilst discarding", size);
+ updateReclaiming();
+ }
}
- // retroactively mark an amount acquired in the tracker, and owned by us
+ /**
+ * Retroactively mark an amount acquired in the tracker, and owned by us. If the state is discarding,
+ * then also update reclaiming since the flush operation is waiting at the barrier for in-flight writes,
+ * and it will flush this memory too.
+ */
private void acquired(long size)
{
- parent.acquired(size);
+ parent.acquired();
ownsUpdater.addAndGet(this, size);
+
+ if (state == LifeCycle.DISCARDING)
+ {
+ noSpamLogger.info("Acquired {} bytes whilst discarding", size);
+ updateReclaiming();
+ }
}
+ /**
+ * If the state is still live, then we update the memory we own here and in the parent.
+ *
+ * However, if the state is not live, we do not update it because we would have to update
+ * reclaiming too, and it could cause problems to the memtable cleaner algorithm if reclaiming
+ * decreased. If the memtable is flushing, soon enough {@link this#releaseAll()} will be called.
+ *
+ * @param size the size that was released
+ */
void released(long size)
{
- parent.released(size);
- ownsUpdater.addAndGet(this, -size);
+ if (state == LifeCycle.LIVE)
+ {
+ parent.released(size);
+ ownsUpdater.addAndGet(this, -size);
+ }
+ else
+ {
+ noSpamLogger.info("Tried to release {} bytes whilst discarding", size);
+ }
}
- // mark everything we currently own as reclaiming, both here and in our parent
- void markAllReclaiming()
+ /**
+ * Mark what we currently own as reclaiming, both here and in our parent.
+ * This method is called for the first time when the memtable is scheduled for flushing,
+ * in which case reclaiming will be zero and we mark everything that we own as reclaiming.
+ * Afterwards, if there are in flight writes that have not completed yet, we also mark any
+ * more memory that is allocated by these writes as reclaiming, since the memtable is waiting
+ * on the barrier for these writes to complete, before it can actually start flushing data.
+ */
+ void updateReclaiming()
{
while (true)
{
@@ -208,6 +283,11 @@ public abstract class MemtableAllocator
return owns;
}
+ public long getReclaiming()
+ {
+ return reclaiming;
+ }
+
public float ownershipRatio()
{
float r = owns / (float) parent.limit;
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java
new file mode 100644
index 0000000..d2cb9c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleaner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The cleaner is used by {@link MemtableCleanerThread} in order to reclaim space from memtables, normally
+ * by flushing the largest memtable.
+ */
+public interface MemtableCleaner
+{
+ /**
+ * This is a function that schedules a cleaning task, normally flushing of the largest sstable.
+ * The future will complete once the operation has completed and it will have a value set to true if
+ * the cleaner was able to execute the cleaning operation or if another thread concurrently executed
+ * the same clean operation. If no operation was even attempted, for example because no memtable was
+ * found, then the value will be false.
+ *
+ * The future will complete with an error if the cleaning operation encounters an error.
+ *
+ */
+ CompletableFuture<Boolean> clean();
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index b905d2c..f6fccc6 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -18,6 +18,12 @@
*/
package org.apache.cassandra.utils.memory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.apache.cassandra.utils.concurrent.WaitQueue;
@@ -27,54 +33,84 @@ import org.apache.cassandra.utils.concurrent.WaitQueue;
*/
public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopExecutor
{
- private static class Clean<P extends MemtablePool> implements InterruptibleRunnable
+ private static final Logger logger = LoggerFactory.getLogger(MemtableCleanerThread.class);
+
+ public static class Clean<P extends MemtablePool> implements InterruptibleRunnable
{
+ /** This is incremented when a cleaner is invoked and decremented when a cleaner has completed */
+ final AtomicInteger numPendingTasks = new AtomicInteger(0);
+
/** The pool we're cleaning */
final P pool;
/** should ensure that at least some memory has been marked reclaiming after completion */
- final Runnable cleaner;
+ final MemtableCleaner cleaner;
/** signalled whenever needsCleaning() may return true */
final WaitQueue wait = new WaitQueue();
- private Clean(P pool, Runnable cleaner)
+ private Clean(P pool, MemtableCleaner cleaner)
{
this.pool = pool;
this.cleaner = cleaner;
}
- boolean needsCleaning()
+ /** Return the number of pending tasks */
+ public int numPendingTasks()
{
- return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
+ return numPendingTasks.get();
}
@Override
public void run() throws InterruptedException
{
- if (needsCleaning())
- {
- cleaner.run();
- }
- else
+ if (!pool.needsCleaning())
{
final WaitQueue.Signal signal = wait.register();
- if (!needsCleaning())
+ if (!pool.needsCleaning())
signal.await();
else
signal.cancel();
}
+ else
+ {
+ int numPendingTasks = this.numPendingTasks.incrementAndGet();
+
+ if (logger.isTraceEnabled())
+ logger.trace("Invoking cleaner with {} tasks pending", numPendingTasks);
+
+ cleaner.clean().handle(this::apply);
+ }
+ }
+
+ private Boolean apply(Boolean res, Throwable err)
+ {
+ final int tasks = numPendingTasks.decrementAndGet();
+
+ // if the cleaning job was scheduled (res == true) or had an error, trigger again after decrementing the tasks
+ if ((res || err != null) && pool.needsCleaning())
+ wait.signal();
+
+ if (err != null)
+ logger.error("Memtable cleaning tasks failed with an exception and {} pending tasks ", tasks, err);
+ else if (logger.isTraceEnabled())
+ logger.trace("Memtable cleaning task completed ({}), currently pending: {}", res, tasks);
+
+ return res;
}
}
private final Runnable trigger;
+ private final Clean<P> clean;
+
private MemtableCleanerThread(Clean<P> clean)
{
super(clean.pool.getClass().getSimpleName() + "Cleaner", clean);
this.trigger = clean.wait::signal;
+ this.clean = clean;
}
- MemtableCleanerThread(P pool, Runnable cleaner)
+ public MemtableCleanerThread(P pool, MemtableCleaner cleaner)
{
this(new Clean<>(pool, cleaner));
}
@@ -84,4 +120,11 @@ public class MemtableCleanerThread<P extends MemtablePool> extends InfiniteLoopE
{
trigger.run();
}
+
+ /** Return the number of pending tasks */
+ @VisibleForTesting
+ public int numPendingTasks()
+ {
+ return clean.numPendingTasks();
+ }
}
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 8061566..cd434c5 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -23,7 +23,9 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.codahale.metrics.Gauge;
import com.codahale.metrics.Timer;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.metrics.DefaultNameFactory;
@@ -44,18 +46,22 @@ public abstract class MemtablePool
public final SubPool offHeap;
public final Timer blockedOnAllocating;
+ public final Gauge<Long> numPendingTasks;
final WaitQueue hasRoom = new WaitQueue();
- MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
+ MemtablePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner)
{
+ Preconditions.checkArgument(cleaner != null, "Cleaner should not be null");
+
this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold);
this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold);
this.cleaner = getCleaner(cleaner);
- blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(new DefaultNameFactory("MemtablePool")
- .createMetricName("BlockedOnAllocation"));
- if (this.cleaner != null)
- this.cleaner.start();
+ this.cleaner.start();
+ DefaultNameFactory nameFactory = new DefaultNameFactory("MemtablePool");
+ blockedOnAllocating = CassandraMetricsRegistry.Metrics.timer(nameFactory.createMetricName("BlockedOnAllocation"));
+ numPendingTasks = CassandraMetricsRegistry.Metrics.register(nameFactory.createMetricName("PendingFlushTasks"),
+ () -> (long) this.cleaner.numPendingTasks());
}
SubPool getSubPool(long limit, float cleanThreshold)
@@ -63,7 +69,7 @@ public abstract class MemtablePool
return new SubPool(limit, cleanThreshold);
}
- MemtableCleanerThread<?> getCleaner(Runnable cleaner)
+ MemtableCleanerThread<?> getCleaner(MemtableCleaner cleaner)
{
return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
}
@@ -78,6 +84,16 @@ public abstract class MemtablePool
public abstract MemtableAllocator newAllocator();
+ public boolean needsCleaning()
+ {
+ return onHeap.needsCleaning() || offHeap.needsCleaning();
+ }
+
+ public Long getNumPendingtasks()
+ {
+ return numPendingTasks.getValue();
+ }
+
/**
* Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
* and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
@@ -169,7 +185,7 @@ public abstract class MemtablePool
maybeClean();
}
- void acquired(long size)
+ void acquired()
{
maybeClean();
}
@@ -203,6 +219,11 @@ public abstract class MemtablePool
return allocated;
}
+ public long getReclaiming()
+ {
+ return reclaiming;
+ }
+
public float reclaimingRatio()
{
float r = reclaiming / (float) limit;
diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java
index 012867a..29ea8fb 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.utils.memory;
public class NativePool extends MemtablePool
{
- public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
+ public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, MemtableCleaner cleaner)
{
super(maxOnHeapMemory, maxOffHeapMemory, cleanThreshold, cleaner);
}
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
index c5c44e1..a779432 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -22,7 +22,7 @@ public class SlabPool extends MemtablePool
{
final boolean allocateOnHeap;
- public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner)
+ public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, MemtableCleaner cleaner)
{
super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner);
this.allocateOnHeap = maxOffHeapMemory == 0;
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index e7216b2..ddeb9da 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -26,7 +26,6 @@ import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -471,16 +470,9 @@ public abstract class CQLTester
public void compact()
{
- try
- {
- ColumnFamilyStore store = getCurrentColumnFamilyStore();
- if (store != null)
- store.forceMajorCompaction();
- }
- catch (InterruptedException | ExecutionException e)
- {
- throw new RuntimeException(e);
- }
+ ColumnFamilyStore store = getCurrentColumnFamilyStore();
+ if (store != null)
+ store.forceMajorCompaction();
}
public void disableCompaction()
diff --git a/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java
new file mode 100644
index 0000000..7100a2a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/memory/MemtableCleanerThreadTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class MemtableCleanerThreadTest
+{
+ private static final long TIMEOUT_SECONDS = 5;
+ private static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS);
+
+ @Mock
+ private MemtablePool pool;
+
+ @Mock
+ private MemtableCleaner cleaner;
+
+ private MemtableCleanerThread<MemtablePool> cleanerThread;
+
+ @Before
+ public void setup()
+ {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ private void startThread()
+ {
+ cleanerThread = new MemtableCleanerThread<>(pool, cleaner);
+ assertNotNull(cleanerThread);
+ cleanerThread.start();
+
+ for (int i = 0; i < TIMEOUT_MILLIS && !cleanerThread.isAlive(); i++)
+ FBUtilities.sleepQuietly(1);
+ }
+
+ private void stopThread() throws InterruptedException
+ {
+ cleanerThread.shutdownNow();
+
+ assertTrue(cleanerThread.awaitTermination(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+ }
+
+ private void waitForPendingTasks()
+ {
+ // wait for a bit because the cleaner latch completes before the pending tasks are decremented
+ FBUtilities.sleepQuietly(TIMEOUT_MILLIS);
+
+ assertEquals(0, cleanerThread.numPendingTasks());
+ }
+
+ @Test
+ public void testCleanerInvoked() throws Exception
+ {
+ CountDownLatch cleanerExecutedLatch = new CountDownLatch(1);
+ CompletableFuture<Boolean> fut = new CompletableFuture<>();
+ AtomicBoolean needsCleaning = new AtomicBoolean(false);
+
+ when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get());
+
+ when(cleaner.clean()).thenAnswer(invocation -> {
+ needsCleaning.set(false);
+ cleanerExecutedLatch.countDown();
+ return fut;
+ });
+
+ // start the thread with needsCleaning returning false, the cleaner should not be invoked
+ needsCleaning.set(false);
+ startThread();
+ assertFalse(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(1, cleanerExecutedLatch.getCount());
+ assertEquals(0, cleanerThread.numPendingTasks());
+
+ // now invoke the cleaner
+ needsCleaning.set(true);
+ cleanerThread.trigger();
+ assertTrue(cleanerExecutedLatch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(0, cleanerExecutedLatch.getCount());
+ assertEquals(1, cleanerThread.numPendingTasks());
+
+ // now complete the cleaning task
+ needsCleaning.set(false);
+ fut.complete(true);
+ waitForPendingTasks();
+
+ stopThread();
+ }
+
+ @Test
+ public void testCleanerError() throws Exception
+ {
+ AtomicReference<CountDownLatch> cleanerLatch = new AtomicReference<>(new CountDownLatch(1));
+ AtomicReference<CompletableFuture<Boolean>> fut = new AtomicReference<>(new CompletableFuture<>());
+ AtomicBoolean needsCleaning = new AtomicBoolean(false);
+ AtomicInteger numTimeCleanerInvoked = new AtomicInteger(0);
+
+ when(pool.needsCleaning()).thenAnswer(invocation -> needsCleaning.get());
+
+ when(cleaner.clean()).thenAnswer(invocation -> {
+ needsCleaning.set(false);
+ numTimeCleanerInvoked.incrementAndGet();
+ cleanerLatch.get().countDown();
+ return fut.get();
+ });
+
+ // start the thread with needsCleaning returning true, the cleaner should be invoked
+ needsCleaning.set(true);
+ startThread();
+ assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(0, cleanerLatch.get().getCount());
+ assertEquals(1, cleanerThread.numPendingTasks());
+ assertEquals(1, numTimeCleanerInvoked.get());
+
+ // complete the cleaning task with an error, no other cleaning task should be invoked
+ cleanerLatch.set(new CountDownLatch(1));
+ CompletableFuture<Boolean> oldFut = fut.get();
+ fut.set(new CompletableFuture<>());
+ needsCleaning.set(false);
+ oldFut.completeExceptionally(new RuntimeException("Test"));
+ assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(1, cleanerLatch.get().getCount());
+ assertEquals(1, numTimeCleanerInvoked.get());
+
+ // now trigger cleaning again and verify that a new task is invoked
+ cleanerLatch.set(new CountDownLatch(1));
+ fut.set(new CompletableFuture<>());
+ needsCleaning.set(true);
+ cleanerThread.trigger();
+ assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(0, cleanerLatch.get().getCount());
+ assertEquals(2, numTimeCleanerInvoked.get());
+
+ // complete the cleaning task with false (nothing should be scheduled)
+ cleanerLatch.set(new CountDownLatch(1));
+ oldFut = fut.get();
+ fut.set(new CompletableFuture<>());
+ needsCleaning.set(false);
+ oldFut.complete(false);
+ assertFalse(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(1, cleanerLatch.get().getCount());
+ assertEquals(2, numTimeCleanerInvoked.get());
+
+ // now trigger cleaning again and verify that a new task is invoked
+ cleanerLatch.set(new CountDownLatch(1));
+ fut.set(new CompletableFuture<>());
+ needsCleaning.set(true);
+ cleanerThread.trigger();
+ assertTrue(cleanerLatch.get().await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+ assertEquals(0, cleanerLatch.get().getCount());
+ assertEquals(3, numTimeCleanerInvoked.get());
+
+ stopThread();
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
index b636bf7..a7b112c 100644
--- a/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/memory/NativeAllocatorTest.java
@@ -22,105 +22,137 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import junit.framework.Assert;
import org.apache.cassandra.utils.concurrent.OpOrder;
public class NativeAllocatorTest
{
+ private ScheduledExecutorService exec;
+ private OpOrder order;
+ private OpOrder.Group group;
+ private CountDownLatch canClean;
+ private CountDownLatch isClean;
+ private AtomicReference<NativeAllocator> allocatorRef;
+ private AtomicReference<OpOrder.Barrier> barrier;
+ private NativePool pool;
+ private NativeAllocator allocator;
+ private Runnable markBlocking;
+
+ @Before
+ public void setUp()
+ {
+ exec = Executors.newScheduledThreadPool(2);
+ order = new OpOrder();
+ group = order.start();
+ canClean = new CountDownLatch(1);
+ isClean = new CountDownLatch(1);
+ allocatorRef = new AtomicReference<>();
+ barrier = new AtomicReference<>();
+ pool = new NativePool(1, 100, 0.75f, () -> {
+ try
+ {
+ canClean.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ if (isClean.getCount() > 0)
+ {
+ allocatorRef.get().offHeap().released(80);
+ isClean.countDown();
+ }
+ return CompletableFuture.completedFuture(true);
+ });
+ allocator = new NativeAllocator(pool);
+ allocatorRef.set(allocator);
+ markBlocking = () -> {
+ barrier.set(order.newBarrier());
+ barrier.get().issue();
+ barrier.get().markBlocking();
+ };
+ }
+
+ private void verifyUsedReclaiming(long used, long reclaiming)
+ {
+ Assert.assertEquals(used, allocator.offHeap().owns());
+ Assert.assertEquals(used, pool.offHeap.used());
+ Assert.assertEquals(reclaiming, allocator.offHeap().getReclaiming());
+ Assert.assertEquals(reclaiming, pool.offHeap.getReclaiming());
+ }
@Test
public void testBookKeeping() throws ExecutionException, InterruptedException
{
- {
- final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
- final OpOrder order = new OpOrder();
- final OpOrder.Group group = order.start();
- final CountDownLatch canClean = new CountDownLatch(1);
- final CountDownLatch isClean = new CountDownLatch(1);
- final AtomicReference<NativeAllocator> allocatorRef = new AtomicReference<>();
- final AtomicReference<OpOrder.Barrier> barrier = new AtomicReference<>();
- final NativeAllocator allocator = new NativeAllocator(new NativePool(1, 100, 0.75f, new Runnable()
+ final Runnable test = () -> {
+ // allocate normal, check accounted and not cleaned
+ allocator.allocate(10, group);
+ verifyUsedReclaiming(10, 0);
+
+ // confirm adjustment works
+ allocator.offHeap().adjust(-10, group);
+ verifyUsedReclaiming(0, 0);
+
+ allocator.offHeap().adjust(10, group);
+ verifyUsedReclaiming(10, 0);
+
+ // confirm we cannot allocate negative
+ boolean success = false;
+ try
{
- public void run()
- {
- try
- {
- canClean.await();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
- if (isClean.getCount() > 0)
- {
- allocatorRef.get().offHeap().released(80);
- isClean.countDown();
- }
- }
- }));
- allocatorRef.set(allocator);
- final Runnable markBlocking = new Runnable()
+ allocator.offHeap().allocate(-10, group);
+ }
+ catch (AssertionError e)
{
+ success = true;
+ }
+
+ Assert.assertTrue(success);
+ Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
+ Assert.assertEquals(1, isClean.getCount());
+
+ // allocate above watermark
+ allocator.allocate(70, group);
+ verifyUsedReclaiming(80, 0);
- public void run()
- {
- barrier.set(order.newBarrier());
- barrier.get().issue();
- barrier.get().markBlocking();
- }
- };
- final Runnable run = new Runnable()
+ // let the cleaner run, it will release 80 bytes
+ canClean.countDown();
+ try
{
- public void run()
- {
- // allocate normal, check accounted and not cleaned
- allocator.allocate(10, group);
- Assert.assertEquals(10, allocator.offHeap().owns());
- // confirm adjustment works
- allocator.offHeap().adjust(-10, group);
- Assert.assertEquals(0, allocator.offHeap().owns());
- allocator.offHeap().adjust(10, group);
- Assert.assertEquals(10, allocator.offHeap().owns());
- // confirm we cannot allocate negative
- boolean success = false;
- try
- {
- allocator.offHeap().allocate(-10, group);
- }
- catch (AssertionError e)
- {
- success = true;
- }
- Assert.assertTrue(success);
- Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
- Assert.assertEquals(1, isClean.getCount());
-
- // allocate above watermark, check cleaned
- allocator.allocate(70, group);
- Assert.assertEquals(80, allocator.offHeap().owns());
- canClean.countDown();
- try
- {
- isClean.await(10L, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
- Assert.assertEquals(0, isClean.getCount());
- Assert.assertEquals(0, allocator.offHeap().owns());
-
- // allocate above limit, check we block until "marked blocking"
- exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS);
- allocator.allocate(110, group);
- Assert.assertNotNull(barrier.get());
- Assert.assertEquals(110, allocator.offHeap().owns());
- }
- };
- exec.submit(run).get();
- }
- }
+ isClean.await(10L, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ Assert.assertEquals(0, isClean.getCount());
+ verifyUsedReclaiming(0, 0);
+ // allocate, then set discarding, then allocated some more
+ allocator.allocate(30, group);
+ verifyUsedReclaiming(30, 0);
+ allocator.setDiscarding();
+ Assert.assertFalse(allocator.isLive());
+ verifyUsedReclaiming(30, 30);
+ allocator.allocate(50, group);
+ verifyUsedReclaiming(80, 80);
+
+ // allocate above limit, check we block until "marked blocking"
+ exec.schedule(markBlocking, 10L, TimeUnit.MILLISECONDS);
+ allocator.allocate(30, group);
+ Assert.assertNotNull(barrier.get());
+ verifyUsedReclaiming(110, 110);
+
+ // release everything
+ allocator.setDiscarded();
+ Assert.assertFalse(allocator.isLive());
+ verifyUsedReclaiming(0, 0);
+ };
+ exec.submit(test).get();
+ }
}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org