You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2022/04/04 16:22:10 UTC

[GitHub] [cassandra] adelapena commented on a diff in pull request #1295: CASSANDRA-17034: Memtable API

adelapena commented on code in PR #1295:
URL: https://github.com/apache/cassandra/pull/1295#discussion_r841900441


##########
src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.memtable;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.AtomicBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_OVERHEAD_COMPUTE_STEPS;
+import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_OVERHEAD_SIZE;
+
+public class SkipListMemtable extends AbstractAllocatorMemtable
+{
+    private static final Logger logger = LoggerFactory.getLogger(SkipListMemtable.class);
+
+    public static final Factory FACTORY = SkipListMemtableFactory.INSTANCE;
+
+    private static final int ROW_OVERHEAD_HEAP_SIZE;
+    static
+    {
+        int userDefinedOverhead = MEMTABLE_OVERHEAD_SIZE.getInt(-1);
+        if (userDefinedOverhead > 0)
+            ROW_OVERHEAD_HEAP_SIZE = userDefinedOverhead;
+        else
+            ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(MEMTABLE_OVERHEAD_COMPUTE_STEPS.getInt());
+    }
+
+    // We index the memtable by PartitionPosition only for the purpose of being able
+    // to select key range using Token.KeyBound. However put() ensures that we
+    // actually only store DecoratedKey.
+    private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
+
+    private final AtomicLong liveDataSize = new AtomicLong(0);
+
+    SkipListMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner)
+    {
+        super(commitLogLowerBound, metadataRef, owner);
+    }
+
+    // Only for testing
+    @VisibleForTesting
+    public SkipListMemtable(TableMetadataRef metadataRef)
+    {
+        this(null, metadataRef, new Owner()
+        {
+            @Override
+            public ListenableFuture<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason)
+            {
+                return null;
+            }
+
+            @Override
+            public Memtable getCurrentMemtable()
+            {
+                return null;
+            }
+
+            @Override
+            public Iterable<Memtable> getIndexMemtables()
+            {
+                return Collections.emptyList();
+            }
+
+            public ShardBoundaries localRangeSplits(int shardCount)
+            {
+                return null; // not implemented
+            }
+        });
+    }
+
+    protected Factory factory()
+    {
+        return FACTORY;
+    }
+
+    @Override
+    public void addMemoryUsageTo(MemoryUsage stats)
+    {
+        super.addMemoryUsageTo(stats);
+    }
+
+    public boolean isClean()
+    {
+        return partitions.isEmpty();
+    }
+
+    /**
+     * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
+     * OpOrdering.
+     *
+     * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null
+     */
+    public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup)
+    {
+        AtomicBTreePartition previous = partitions.get(update.partitionKey());
+
+        long initialSize = 0;
+        if (previous == null)
+        {
+            final DecoratedKey cloneKey = allocator.clone(update.partitionKey(), opGroup);
+            AtomicBTreePartition empty = new AtomicBTreePartition(metadata, cloneKey, allocator);
+            // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
+            previous = partitions.putIfAbsent(cloneKey, empty);
+            if (previous == null)
+            {
+                previous = empty;
+                // allocate the row overhead after the fact; this saves over allocating and having to free after, but
+                // means we can overshoot our declared limit.
+                int overhead = (int) (cloneKey.getToken().getHeapSize() + ROW_OVERHEAD_HEAP_SIZE);
+                allocator.onHeap().allocate(overhead, opGroup);
+                initialSize = 8;
+            }
+        }
+
+        long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
+        updateMin(minTimestamp, update.stats().minTimestamp);
+        updateMin(minLocalDeletionTime, update.stats().minLocalDeletionTime);
+        liveDataSize.addAndGet(initialSize + pair[0]);
+        columnsCollector.update(update.columns());
+        statsCollector.update(update.stats());
+        currentOperations.addAndGet(update.operationCount());
+        return pair[1];
+    }
+
+    public long partitionCount()
+    {
+        return partitions.size();
+    }
+
+    public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter,
+                                                                 final DataRange dataRange,
+                                                                 SSTableReadsListener readsListener)
+    {
+        AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
+
+        PartitionPosition left = keyRange.left;
+        PartitionPosition right = keyRange.right;
+
+        boolean isBound = keyRange instanceof Bounds;
+        boolean includeLeft = isBound || keyRange instanceof IncludingExcludingBounds;
+        boolean includeRight = isBound || keyRange instanceof Range;
+        Map<PartitionPosition, AtomicBTreePartition> subMap = getPartitionsSubMap(left,
+                                                                                  includeLeft,
+                                                                                  right,
+                                                                                  includeRight);
+
+        return new MemtableUnfilteredPartitionIterator(metadata.get(), subMap, columnFilter, dataRange);
+        // readsListener is ignored as it only accepts sstable signals
+    }
+
+    private Map<PartitionPosition, AtomicBTreePartition> getPartitionsSubMap(PartitionPosition left,
+                                                                             boolean includeLeft,
+                                                                             PartitionPosition right,
+                                                                             boolean includeRight)
+    {
+        if (left != null && left.isMinimum())
+            left = null;
+        if (right != null && right.isMinimum())
+            right = null;
+
+        try
+        {
+            if (left == null)
+                return right == null ? partitions : partitions.headMap(right, includeRight);
+            else
+                return right == null
+                       ? partitions.tailMap(left, includeLeft)
+                       : partitions.subMap(left, includeLeft, right, includeRight);
+        }
+        catch (IllegalArgumentException e)
+        {
+            logger.error("Invalid range requested {} - {}", left, right);
+            throw e;
+        }
+    }
+
+    public Partition getPartition(DecoratedKey key)
+    {
+        return partitions.get(key);
+    }
+
+    public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener)
+    {
+        Partition p = getPartition(key);
+        if (p == null)
+            return null;
+        else
+            return p.unfilteredIterator(selectedColumns, slices, reversed);
+    }
+
+    public UnfilteredRowIterator iterator(DecoratedKey key)
+    {
+        Partition p = getPartition(key);
+        return p != null ? p.unfilteredIterator() : null;
+    }
+
+    private static int estimateRowOverhead(final int count)
+    {
+        // calculate row overhead
+        try (final OpOrder.Group group = new OpOrder().start())
+        {
+            int rowOverhead;
+            MemtableAllocator allocator = MEMORY_POOL.newAllocator("");
+            ConcurrentNavigableMap<PartitionPosition, Object> partitions = new ConcurrentSkipListMap<>();
+            final Object val = new Object();
+            for (int i = 0 ; i < count ; i++)
+                partitions.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
+            double avgSize = ObjectSizes.measureDeep(partitions) / (double) count;
+            rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
+            rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
+            rowOverhead += AtomicBTreePartition.EMPTY_SIZE;
+            rowOverhead += AbstractBTreePartition.HOLDER_UNSHARED_HEAP_SIZE;
+            allocator.setDiscarding();
+            allocator.setDiscarded();
+            return rowOverhead;
+        }
+    }
+
+    public FlushCollection<?> getFlushSet(PartitionPosition from, PartitionPosition to)
+    {
+        Map<PartitionPosition, AtomicBTreePartition> toFlush = getPartitionsSubMap(from, true, to, false);
+        long keySize = 0;
+
+        boolean trackContention = logger.isTraceEnabled();
+        if (trackContention)
+        {
+            int heavilyContendedRowCount = 0;
+
+            for (AtomicBTreePartition partition : toFlush.values())
+            {
+                keySize += partition.partitionKey().getKey().remaining();
+                if (trackContention && partition.useLock())

Review Comment:
   I think `trackContention` is always true



##########
src/java/org/apache/cassandra/db/memtable/Memtable.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables.
+ * They are of several types:
+ * - construction factory interface
+ * - write and read operations: put, getPartition and makePartitionIterator
+ * - statistics and features, including partition counts, data size, encoding stats, written columns
+ * - memory usage tracking, including methods of retrieval and of adding extra allocated space (used non-CFS secondary
+ *   indexes)
+ * - flush functionality, preparing the set of partitions to flush for given ranges
+ * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together
+ *   with ways of tracking the affected commit log spans
+ */
+public interface Memtable extends Comparable<Memtable>, UnfilteredSource
+{
+    // Construction
+
+    /**
+     * Factory interface for constructing memtables, and querying write durability features.
+     *
+     * The factory is chosen using the MemtableParams class (passed as argument to
+     * {@code CREATE TABLE ... WITH memtable = {...}} or in the memtable options in cassandra.yaml). To make that
+     * possible, implementations must provide either a static {@code FACTORY} field (if they accept no further option)
+     * or a static {@code factory(Map<String, String>)} method. In the latter case, the method should avoid creating
+     * multiple instances of the factory for the same parameters, or factories should at least implement hashCode and
+     * equals.
+     */
+    interface Factory
+    {
+        /**
+         * Create a memtable.
+         *
+         * @param commitLogLowerBound A commit log lower bound for the new memtable. This will be equal to the previous
+         *                            memtable's upper bound and defines the span of positions that any flushed sstable
+         *                            will cover.
+         * @param metadaRef Pointer to the up-to-date table metadata.
+         * @param owner Owning objects that will receive flush requests triggered by the memtable (e.g. on expiration).
+         */
+        Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadaRef, Owner owner);
+
+        /**
+         * If the memtable can achieve write durability directly (i.e. using some feature other than the commitlog, e.g.
+         * persistent memory), it can return true here, in which case the commit log will not store mutations in this
+         * table.
+         * Note that doing so will prevent point-in-time restores and changed data capture, thus a durable memtable must
+         * allow the option of turning commit log writing on even if it does not need it.
+         */
+        default boolean writesShouldSkipCommitLog()
+        {
+            return false;
+        }
+
+        /**
+         * This should be true if the memtable can achieve write durability for crash recovery directly (i.e. using some
+         * feature other than the commitlog, e.g. persistent memory).
+         * Setting this flag to true means that the commitlog should not replay mutations for this table on restart,
+         * and that it should not try to preserve segments that contain relevant data.
+         * Unless writesShouldSkipCommitLog() is also true, writes will be recorded in the commit log as they may be
+         * needed for changed data capture or point-in-time restore.
+         */
+        default boolean writesAreDurable()
+        {
+            return false;
+        }
+
+        /**
+         * Normally we can receive streamed sstables directly, skipping the memtable stage (zero-copy-streaming). When
+         * the memtable is the primary data store (e.g. persistent memtables), it will usually prefer to receive the
+         * data instead.
+         *
+         * If this returns true, all streamed sstables's content will be read and replayed as mutations, disabling
+         * zero-copy streaming.
+         */
+        default boolean streamToMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * When we need to stream data, we usually flush and stream the resulting sstables. This will not work correctly
+         * if the memtable does not want to flush for streaming (e.g. persistent memtables acting as primary data
+         * store), because data (not just recent) will be missing from the streamed view. Such memtables must present
+         * their data separately for streaming.
+         * In other words if the memtable returns false on shouldSwitch(STREAMING/REPAIR), its factory must return true
+         * here.
+         *
+         * If this flag returns true, streaming will write the relevant content that resides in the memtable to
+         * temporary sstables, stream these sstables and then delete them.
+         */
+        default boolean streamFromMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * Override this method to include implementation-specific memtable metrics in the table metrics.
+         *
+         * Memtable metrics lifecycle matches table lifecycle. It is the table that owns the metrics and
+         * decides when to release them.
+         */
+        default TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef)
+        {
+            return null;
+        }
+    }
+
+    /**
+     * Interface for providing signals back to the owner.

Review Comment:
   Just to ease reading, I would add some details about what is the owner, like in the description of the `owner` parameter of `Memtable.Factory#create`.



##########
src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.memtable;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.AtomicBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_OVERHEAD_COMPUTE_STEPS;
+import static org.apache.cassandra.config.CassandraRelevantProperties.MEMTABLE_OVERHEAD_SIZE;
+
+public class SkipListMemtable extends AbstractAllocatorMemtable
+{
+    private static final Logger logger = LoggerFactory.getLogger(SkipListMemtable.class);
+
+    public static final Factory FACTORY = SkipListMemtableFactory.INSTANCE;
+
+    private static final int ROW_OVERHEAD_HEAP_SIZE;
+    static
+    {
+        int userDefinedOverhead = MEMTABLE_OVERHEAD_SIZE.getInt(-1);
+        if (userDefinedOverhead > 0)
+            ROW_OVERHEAD_HEAP_SIZE = userDefinedOverhead;
+        else
+            ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(MEMTABLE_OVERHEAD_COMPUTE_STEPS.getInt());
+    }
+
+    // We index the memtable by PartitionPosition only for the purpose of being able
+    // to select key range using Token.KeyBound. However put() ensures that we
+    // actually only store DecoratedKey.
+    private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> partitions = new ConcurrentSkipListMap<>();
+
+    private final AtomicLong liveDataSize = new AtomicLong(0);
+
+    SkipListMemtable(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadataRef, Owner owner)
+    {
+        super(commitLogLowerBound, metadataRef, owner);
+    }
+
+    // Only for testing
+    @VisibleForTesting
+    public SkipListMemtable(TableMetadataRef metadataRef)
+    {
+        this(null, metadataRef, new Owner()
+        {
+            @Override
+            public ListenableFuture<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason)
+            {
+                return null;
+            }
+
+            @Override
+            public Memtable getCurrentMemtable()
+            {
+                return null;
+            }
+
+            @Override
+            public Iterable<Memtable> getIndexMemtables()
+            {
+                return Collections.emptyList();
+            }
+
+            public ShardBoundaries localRangeSplits(int shardCount)
+            {
+                return null; // not implemented
+            }
+        });
+    }
+
+    protected Factory factory()
+    {
+        return FACTORY;
+    }
+
+    @Override
+    public void addMemoryUsageTo(MemoryUsage stats)
+    {
+        super.addMemoryUsageTo(stats);
+    }
+
+    public boolean isClean()
+    {
+        return partitions.isEmpty();
+    }
+
+    /**
+     * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
+     * OpOrdering.
+     *
+     * commitLogSegmentPosition should only be null if this is a secondary index, in which case it is *expected* to be null
+     */
+    public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup)
+    {
+        AtomicBTreePartition previous = partitions.get(update.partitionKey());
+
+        long initialSize = 0;
+        if (previous == null)
+        {
+            final DecoratedKey cloneKey = allocator.clone(update.partitionKey(), opGroup);
+            AtomicBTreePartition empty = new AtomicBTreePartition(metadata, cloneKey, allocator);
+            // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
+            previous = partitions.putIfAbsent(cloneKey, empty);
+            if (previous == null)
+            {
+                previous = empty;
+                // allocate the row overhead after the fact; this saves over allocating and having to free after, but
+                // means we can overshoot our declared limit.
+                int overhead = (int) (cloneKey.getToken().getHeapSize() + ROW_OVERHEAD_HEAP_SIZE);
+                allocator.onHeap().allocate(overhead, opGroup);
+                initialSize = 8;
+            }
+        }
+
+        long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
+        updateMin(minTimestamp, update.stats().minTimestamp);
+        updateMin(minLocalDeletionTime, update.stats().minLocalDeletionTime);
+        liveDataSize.addAndGet(initialSize + pair[0]);
+        columnsCollector.update(update.columns());
+        statsCollector.update(update.stats());
+        currentOperations.addAndGet(update.operationCount());
+        return pair[1];
+    }
+
+    public long partitionCount()
+    {
+        return partitions.size();
+    }
+
+    public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter,
+                                                                 final DataRange dataRange,
+                                                                 SSTableReadsListener readsListener)
+    {
+        AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();
+
+        PartitionPosition left = keyRange.left;
+        PartitionPosition right = keyRange.right;
+
+        boolean isBound = keyRange instanceof Bounds;
+        boolean includeLeft = isBound || keyRange instanceof IncludingExcludingBounds;
+        boolean includeRight = isBound || keyRange instanceof Range;
+        Map<PartitionPosition, AtomicBTreePartition> subMap = getPartitionsSubMap(left,
+                                                                                  includeLeft,
+                                                                                  right,
+                                                                                  includeRight);
+
+        return new MemtableUnfilteredPartitionIterator(metadata.get(), subMap, columnFilter, dataRange);
+        // readsListener is ignored as it only accepts sstable signals
+    }
+
+    private Map<PartitionPosition, AtomicBTreePartition> getPartitionsSubMap(PartitionPosition left,
+                                                                             boolean includeLeft,
+                                                                             PartitionPosition right,
+                                                                             boolean includeRight)
+    {
+        if (left != null && left.isMinimum())
+            left = null;
+        if (right != null && right.isMinimum())
+            right = null;
+
+        try
+        {
+            if (left == null)
+                return right == null ? partitions : partitions.headMap(right, includeRight);
+            else
+                return right == null
+                       ? partitions.tailMap(left, includeLeft)
+                       : partitions.subMap(left, includeLeft, right, includeRight);
+        }
+        catch (IllegalArgumentException e)
+        {
+            logger.error("Invalid range requested {} - {}", left, right);
+            throw e;
+        }
+    }
+
+    public Partition getPartition(DecoratedKey key)
+    {
+        return partitions.get(key);
+    }
+
+    public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener)
+    {
+        Partition p = getPartition(key);
+        if (p == null)
+            return null;
+        else
+            return p.unfilteredIterator(selectedColumns, slices, reversed);
+    }
+
+    public UnfilteredRowIterator iterator(DecoratedKey key)
+    {
+        Partition p = getPartition(key);
+        return p != null ? p.unfilteredIterator() : null;
+    }
+
+    private static int estimateRowOverhead(final int count)
+    {
+        // calculate row overhead
+        try (final OpOrder.Group group = new OpOrder().start())
+        {
+            int rowOverhead;
+            MemtableAllocator allocator = MEMORY_POOL.newAllocator("");
+            ConcurrentNavigableMap<PartitionPosition, Object> partitions = new ConcurrentSkipListMap<>();
+            final Object val = new Object();
+            for (int i = 0 ; i < count ; i++)
+                partitions.put(allocator.clone(new BufferDecoratedKey(new LongToken(i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
+            double avgSize = ObjectSizes.measureDeep(partitions) / (double) count;
+            rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
+            rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
+            rowOverhead += AtomicBTreePartition.EMPTY_SIZE;
+            rowOverhead += AbstractBTreePartition.HOLDER_UNSHARED_HEAP_SIZE;
+            allocator.setDiscarding();
+            allocator.setDiscarded();
+            return rowOverhead;
+        }
+    }
+
+    public FlushCollection<?> getFlushSet(PartitionPosition from, PartitionPosition to)
+    {
+        Map<PartitionPosition, AtomicBTreePartition> toFlush = getPartitionsSubMap(from, true, to, false);
+        long keySize = 0;
+
+        boolean trackContention = logger.isTraceEnabled();
+        if (trackContention)
+        {
+            int heavilyContendedRowCount = 0;
+
+            for (AtomicBTreePartition partition : toFlush.values())
+            {
+                keySize += partition.partitionKey().getKey().remaining();
+                if (trackContention && partition.useLock())
+                    heavilyContendedRowCount++;
+            }
+
+            if (heavilyContendedRowCount > 0)
+                logger.trace("High update contention in {}/{} partitions of {} ", heavilyContendedRowCount, toFlush.size(), SkipListMemtable.this);
+        }
+        else
+        {
+            for (PartitionPosition key : toFlush.keySet())
+            {
+                //  make sure we don't write non-sensical keys
+                assert key instanceof DecoratedKey;
+                keySize += ((DecoratedKey) key).getKey().remaining();
+            }
+        }
+        final long partitionKeySize = keySize;
+
+        return new AbstractFlushCollection<AtomicBTreePartition>()
+        {
+            public Memtable memtable()
+            {
+                return SkipListMemtable.this;
+            }
+
+            public PartitionPosition from()
+            {
+                return from;
+            }
+
+            public PartitionPosition to()
+            {
+                return to;
+            }
+
+            public long partitionCount()
+            {
+                return toFlush.size();
+            }
+
+            public Iterator<AtomicBTreePartition> iterator()
+            {
+                return toFlush.values().iterator();
+            }
+
+            public long partitionKeySize()
+            {
+                return partitionKeySize;
+            }
+        };
+    }
+
+
+    public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator
+    {
+        private final TableMetadata metadata;
+        private final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter;
+        private final Map<PartitionPosition, AtomicBTreePartition> source;

Review Comment:
   `source` is only used by the constructor to get the iterator, we don't need the attribute



##########
src/java/org/apache/cassandra/streaming/StreamSession.java:
##########
@@ -33,6 +33,7 @@
 import io.netty.util.concurrent.Future;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;

Review Comment:
   Nit: unused import



##########
src/java/org/apache/cassandra/schema/TableParams.java:
##########
@@ -178,6 +182,9 @@ public void validate()
 
         if (memtableFlushPeriodInMs < 0)
             fail("%s must be greater than or equal to 0 (got %s)", Option.MEMTABLE_FLUSH_PERIOD_IN_MS, memtableFlushPeriodInMs);
+
+        if (cdc && memtable.factory.writesShouldSkipCommitLog())
+            fail("CDC cannot work if writes skip the commit log. Check your memtable configuration.");

Review Comment:
   ```suggestion
               fail("CDC cannot work if memtable writes skip the commit log. Check your memtable configuration.");
   ```



##########
src/java/org/apache/cassandra/db/memtable/Memtable.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables.
+ * They are of several types:
+ * - construction factory interface
+ * - write and read operations: put, getPartition and makePartitionIterator
+ * - statistics and features, including partition counts, data size, encoding stats, written columns
+ * - memory usage tracking, including methods of retrieval and of adding extra allocated space (used non-CFS secondary
+ *   indexes)
+ * - flush functionality, preparing the set of partitions to flush for given ranges
+ * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together
+ *   with ways of tracking the affected commit log spans
+ */
+public interface Memtable extends Comparable<Memtable>, UnfilteredSource
+{
+    // Construction
+
+    /**
+     * Factory interface for constructing memtables, and querying write durability features.
+     *
+     * The factory is chosen using the MemtableParams class (passed as argument to
+     * {@code CREATE TABLE ... WITH memtable = {...}} or in the memtable options in cassandra.yaml). To make that
+     * possible, implementations must provide either a static {@code FACTORY} field (if they accept no further option)
+     * or a static {@code factory(Map<String, String>)} method. In the latter case, the method should avoid creating
+     * multiple instances of the factory for the same parameters, or factories should at least implement hashCode and
+     * equals.
+     */
+    interface Factory
+    {
+        /**
+         * Create a memtable.
+         *
+         * @param commitLogLowerBound A commit log lower bound for the new memtable. This will be equal to the previous
+         *                            memtable's upper bound and defines the span of positions that any flushed sstable
+         *                            will cover.
+         * @param metadaRef Pointer to the up-to-date table metadata.
+         * @param owner Owning objects that will receive flush requests triggered by the memtable (e.g. on expiration).
+         */
+        Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadaRef, Owner owner);
+
+        /**
+         * If the memtable can achieve write durability directly (i.e. using some feature other than the commitlog, e.g.
+         * persistent memory), it can return true here, in which case the commit log will not store mutations in this
+         * table.
+         * Note that doing so will prevent point-in-time restores and changed data capture, thus a durable memtable must
+         * allow the option of turning commit log writing on even if it does not need it.
+         */
+        default boolean writesShouldSkipCommitLog()
+        {
+            return false;
+        }
+
+        /**
+         * This should be true if the memtable can achieve write durability for crash recovery directly (i.e. using some
+         * feature other than the commitlog, e.g. persistent memory).
+         * Setting this flag to true means that the commitlog should not replay mutations for this table on restart,
+         * and that it should not try to preserve segments that contain relevant data.
+         * Unless writesShouldSkipCommitLog() is also true, writes will be recorded in the commit log as they may be
+         * needed for changed data capture or point-in-time restore.
+         */
+        default boolean writesAreDurable()
+        {
+            return false;
+        }
+
+        /**
+         * Normally we can receive streamed sstables directly, skipping the memtable stage (zero-copy-streaming). When
+         * the memtable is the primary data store (e.g. persistent memtables), it will usually prefer to receive the
+         * data instead.
+         *
+         * If this returns true, all streamed sstables's content will be read and replayed as mutations, disabling
+         * zero-copy streaming.
+         */
+        default boolean streamToMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * When we need to stream data, we usually flush and stream the resulting sstables. This will not work correctly
+         * if the memtable does not want to flush for streaming (e.g. persistent memtables acting as primary data
+         * store), because data (not just recent) will be missing from the streamed view. Such memtables must present
+         * their data separately for streaming.
+         * In other words if the memtable returns false on shouldSwitch(STREAMING/REPAIR), its factory must return true
+         * here.
+         *
+         * If this flag returns true, streaming will write the relevant content that resides in the memtable to
+         * temporary sstables, stream these sstables and then delete them.
+         */
+        default boolean streamFromMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * Override this method to include implementation-specific memtable metrics in the table metrics.
+         *
+         * Memtable metrics lifecycle matches table lifecycle. It is the table that owns the metrics and
+         * decides when to release them.
+         */
+        default TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef)
+        {
+            return null;
+        }
+    }
+
+    /**
+     * Interface for providing signals back to the owner.
+     */
+    interface Owner
+    {
+        /** Signal to the owner that a flush is required (e.g. in response to hitting space limits) */
+        ListenableFuture<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason);
+
+        /** Get the current memtable for this owner. Used to avoid capturing memtable in scheduled flush tasks. */
+        Memtable getCurrentMemtable();
+
+        /**
+         * Collect the index memtables flushed together with this. Used to accurately calculate memory that would be
+         * freed by a flush.
+         */
+        Iterable<Memtable> getIndexMemtables();
+
+        /**
+         * Construct a list of boundaries that split the locally-owned ranges into the given number of shards,
+         * splitting the owned space evenly. It is up to the memtable to use this information.
+         * Any changes in the ring structure (e.g. added or removed nodes) will invalidate the splits; in such a case
+         * the memtable will be sent a shouldSwitch(OWNED_RANGES_CHANGE) and, should that return false, a
+         * localRangesChanged() call.
+         */
+        ShardBoundaries localRangeSplits(int shardCount);
+    }
+
+    // Main write and read operations
+
+    /**
+     * Put new data in the memtable. This operation may block until enough memory is available in the memory pool.
+     *
+     * @param update the partition update, may be a new partition or an update to an existing one
+     * @param indexer receives information about the update's effect
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     *
+     * @return the smallest timestamp delta between corresponding rows from existing and update. A
+     * timestamp delta being computed as the difference between the cells and DeletionTimes from any existing partition
+     * and those in {@code update}. See CASSANDRA-7979.
+     */
+    long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup);
+
+    // Read operations are provided by the UnfilteredSource interface.
+
+    // Statistics
+
+    /** Number of partitions stored in the memtable */
+    long partitionCount();
+
+    /** Size of the data not accounting for any metadata / mapping overheads */
+    long getLiveDataSize();
+
+    /**
+     * Number of "operations" (in the sense defined in {@link PartitionUpdate#operationCount()}) the memtable has
+     * executed.
+     */
+    long getOperations();

Review Comment:
   Nit: this method name format seems inconsistent with `#partitionCount` and with `PartitionUpdate#operationCount()`, perhaps we could call it `operationCount`.



##########
src/java/org/apache/cassandra/db/memtable/Memtable.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables.
+ * They are of several types:
+ * - construction factory interface
+ * - write and read operations: put, getPartition and makePartitionIterator

Review Comment:
   The mentioned read methods (`getPartition` and `makePartition`) don't exist on `UnfilteredSource`.



##########
src/java/org/apache/cassandra/db/memtable/Memtable.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables.
+ * They are of several types:
+ * - construction factory interface
+ * - write and read operations: put, getPartition and makePartitionIterator
+ * - statistics and features, including partition counts, data size, encoding stats, written columns
+ * - memory usage tracking, including methods of retrieval and of adding extra allocated space (used non-CFS secondary
+ *   indexes)
+ * - flush functionality, preparing the set of partitions to flush for given ranges
+ * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together
+ *   with ways of tracking the affected commit log spans
+ */
+public interface Memtable extends Comparable<Memtable>, UnfilteredSource
+{
+    // Construction
+
+    /**
+     * Factory interface for constructing memtables, and querying write durability features.
+     *
+     * The factory is chosen using the MemtableParams class (passed as argument to
+     * {@code CREATE TABLE ... WITH memtable = {...}} or in the memtable options in cassandra.yaml). To make that
+     * possible, implementations must provide either a static {@code FACTORY} field (if they accept no further option)
+     * or a static {@code factory(Map<String, String>)} method. In the latter case, the method should avoid creating
+     * multiple instances of the factory for the same parameters, or factories should at least implement hashCode and
+     * equals.
+     */
+    interface Factory
+    {
+        /**
+         * Create a memtable.
+         *
+         * @param commitLogLowerBound A commit log lower bound for the new memtable. This will be equal to the previous
+         *                            memtable's upper bound and defines the span of positions that any flushed sstable
+         *                            will cover.
+         * @param metadaRef Pointer to the up-to-date table metadata.
+         * @param owner Owning objects that will receive flush requests triggered by the memtable (e.g. on expiration).
+         */
+        Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadaRef, Owner owner);
+
+        /**
+         * If the memtable can achieve write durability directly (i.e. using some feature other than the commitlog, e.g.
+         * persistent memory), it can return true here, in which case the commit log will not store mutations in this
+         * table.
+         * Note that doing so will prevent point-in-time restores and changed data capture, thus a durable memtable must
+         * allow the option of turning commit log writing on even if it does not need it.
+         */
+        default boolean writesShouldSkipCommitLog()
+        {
+            return false;
+        }
+
+        /**
+         * This should be true if the memtable can achieve write durability for crash recovery directly (i.e. using some
+         * feature other than the commitlog, e.g. persistent memory).
+         * Setting this flag to true means that the commitlog should not replay mutations for this table on restart,
+         * and that it should not try to preserve segments that contain relevant data.
+         * Unless writesShouldSkipCommitLog() is also true, writes will be recorded in the commit log as they may be
+         * needed for changed data capture or point-in-time restore.
+         */
+        default boolean writesAreDurable()
+        {
+            return false;
+        }
+
+        /**
+         * Normally we can receive streamed sstables directly, skipping the memtable stage (zero-copy-streaming). When
+         * the memtable is the primary data store (e.g. persistent memtables), it will usually prefer to receive the
+         * data instead.
+         *
+         * If this returns true, all streamed sstables's content will be read and replayed as mutations, disabling
+         * zero-copy streaming.
+         */
+        default boolean streamToMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * When we need to stream data, we usually flush and stream the resulting sstables. This will not work correctly
+         * if the memtable does not want to flush for streaming (e.g. persistent memtables acting as primary data
+         * store), because data (not just recent) will be missing from the streamed view. Such memtables must present
+         * their data separately for streaming.
+         * In other words if the memtable returns false on shouldSwitch(STREAMING/REPAIR), its factory must return true
+         * here.
+         *
+         * If this flag returns true, streaming will write the relevant content that resides in the memtable to
+         * temporary sstables, stream these sstables and then delete them.
+         */
+        default boolean streamFromMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * Override this method to include implementation-specific memtable metrics in the table metrics.
+         *
+         * Memtable metrics lifecycle matches table lifecycle. It is the table that owns the metrics and
+         * decides when to release them.
+         */
+        default TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef)
+        {
+            return null;
+        }
+    }
+
+    /**
+     * Interface for providing signals back to the owner.
+     */
+    interface Owner
+    {
+        /** Signal to the owner that a flush is required (e.g. in response to hitting space limits) */
+        ListenableFuture<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason);
+
+        /** Get the current memtable for this owner. Used to avoid capturing memtable in scheduled flush tasks. */
+        Memtable getCurrentMemtable();
+
+        /**
+         * Collect the index memtables flushed together with this. Used to accurately calculate memory that would be
+         * freed by a flush.
+         */
+        Iterable<Memtable> getIndexMemtables();
+
+        /**
+         * Construct a list of boundaries that split the locally-owned ranges into the given number of shards,
+         * splitting the owned space evenly. It is up to the memtable to use this information.
+         * Any changes in the ring structure (e.g. added or removed nodes) will invalidate the splits; in such a case
+         * the memtable will be sent a shouldSwitch(OWNED_RANGES_CHANGE) and, should that return false, a
+         * localRangesChanged() call.
+         */
+        ShardBoundaries localRangeSplits(int shardCount);
+    }
+
+    // Main write and read operations
+
+    /**
+     * Put new data in the memtable. This operation may block until enough memory is available in the memory pool.
+     *
+     * @param update the partition update, may be a new partition or an update to an existing one
+     * @param indexer receives information about the update's effect
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     *
+     * @return the smallest timestamp delta between corresponding rows from existing and update. A
+     * timestamp delta being computed as the difference between the cells and DeletionTimes from any existing partition
+     * and those in {@code update}. See CASSANDRA-7979.
+     */
+    long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup);
+
+    // Read operations are provided by the UnfilteredSource interface.
+
+    // Statistics
+
+    /** Number of partitions stored in the memtable */
+    long partitionCount();
+
+    /** Size of the data not accounting for any metadata / mapping overheads */
+    long getLiveDataSize();
+
+    /**
+     * Number of "operations" (in the sense defined in {@link PartitionUpdate#operationCount()}) the memtable has
+     * executed.
+     */
+    long getOperations();
+
+    /**
+     * The table's definition metadata.
+     *
+     * Note that this tracks the current state of the table and is not necessarily the same as what was used to create
+     * the memtable.
+     */
+    TableMetadata metadata();
+
+
+    // Memory usage tracking
+
+    /**
+     * Add this memtable's used memory to the given usage object. This can be used to retrieve a single memtable's usage
+     * as well as to combine the ones of related sstables (e.g. a table and its table-based secondary indexes).
+     */
+    void addMemoryUsageTo(MemoryUsage usage);
+
+
+    /**
+     * Creates a holder for memory usage collection.
+     *
+     * This is used to track on- and off-heap memory, as well as the ratio to the total permitted memtable memory.
+     */
+    static MemoryUsage newMemoryUsage()
+    {
+        return new MemoryUsage();
+    }
+
+    /**
+     * Shorthand for the getting a given table's memory usage.
+     * Implemented as a static to prevent implementations altering expectations by e.g. returning a cached object.
+     */
+    static MemoryUsage getMemoryUsage(Memtable memtable)
+    {
+        MemoryUsage usage = newMemoryUsage();
+        memtable.addMemoryUsageTo(usage);
+        return usage;
+    }
+
+    class MemoryUsage
+    {
+        /** On-heap memory used in bytes */
+        public long ownsOnHeap = 0;
+        /** Off-heap memory used in bytes */
+        public long ownsOffHeap = 0;
+        /** On-heap memory as ratio to permitted memtable space */
+        public float ownershipRatioOnHeap = 0.0f;
+        /** Off-heap memory as ratio to permitted memtable space */
+        public float ownershipRatioOffHeap = 0.0f;
+
+        public String toString()
+        {
+            return String.format("%s (%.0f%%) on-heap, %s (%.0f%%) off-heap",
+                                 FBUtilities.prettyPrintMemory(ownsOnHeap),
+                                 ownershipRatioOnHeap * 100,
+                                 FBUtilities.prettyPrintMemory(ownsOffHeap),
+                                 ownershipRatioOffHeap * 100);
+        }
+    }
+
+    /**
+     * Adjust the used on-heap space by the given size (e.g. to reflect memory used by a non-table-based index).
+     * This operation may block until enough memory is available in the memory pool.
+     *
+     * @param additionalSpace the number of allocated bytes
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     */
+    void markExtraOnHeapUsed(long additionalSpace, OpOrder.Group opGroup);
+
+    /**
+     * Adjust the used off-heap space by the given size (e.g. to reflect memory used by a non-table-based index).
+     * This operation may block until enough memory is available in the memory pool.
+     *
+     * @param additionalSpace the number of allocated bytes
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     */
+    void markExtraOffHeapUsed(long additionalSpace, OpOrder.Group opGroup);
+
+
+    // Flushing
+
+    /**
+     * Get the collection of data between the given partition boundaries in a form suitable for flushing.
+     */
+    FlushCollection<?> getFlushSet(PartitionPosition from, PartitionPosition to);
+
+    /**
+     * A collection of partitions for flushing plus some information required for writing an sstable.
+     *
+     * Note that the listed entries must conform with the specified metadata. In particular, if the memtable is still
+     * being written to, care must be taken to not list newer items as they may violate the bounds collected by the
+     * encoding stats or refer to columns that don't exist in the collected columns set.
+     */
+    interface FlushCollection<P extends Partition> extends Iterable<P>, SSTableWriter.SSTableSizeParameters
+    {
+        Memtable memtable();
+
+        PartitionPosition from();
+        PartitionPosition to();
+
+        /** The commit log position at the time that this memtable was created */
+        CommitLogPosition commitLogLowerBound();
+        /** The commit log position at the time that this memtable was switched out */
+        CommitLogPosition commitLogUpperBound();
+
+        /** The set of all columns that have been written */
+        RegularAndStaticColumns columns();
+        /** Statistics required for writing an sstable efficiently */
+        EncodingStats encodingStats();
+
+        default TableMetadata metadata()
+        {
+            return memtable().metadata();
+        }
+
+        long partitionCount();
+        default boolean isEmpty()
+        {
+            return partitionCount() > 0;
+        }
+    }
+
+
+    // Lifecycle management
+
+    /**
+     * Called to tell the memtable that it is being switched out and will be flushed (or dropped) and discarded.
+     * Will be followed by a {@link #getFlushSet} call (if the table is not truncated or dropped), and a
+     * {@link #discard}.
+     *
+     * @param writeBarrier The barrier that will signal that all writes to this memtable have completed. That is, the
+     *                     point after which writes cannot be accepted by this memtable (it is permitted for writes
+     *                     before this barrier to go into the next; see {@link #accepts}).
+     * @param commitLogUpperBound The upper commit log position for this memtable. The value may be modified after this
+     *                            call and will match the next memtable's lower commit log bound.
+     */
+    void switchOut(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound);
+
+    /**
+     * This memtable is no longer in use or required for outstanding flushes or operations.
+     * All held memory must be released.
+     */
+    void discard();
+
+    /**
+     * Decide if this memtable should take a write with the given parameters, or if the write should go to the next
+     * memtable. This enforces that no writes after the barrier set by {@link #switchOut} can be accepted, and
+     * is also used to define a shared commit log bound as the upper for this memtable and lower for the next.
+     */
+    boolean accepts(OpOrder.Group opGroup, CommitLogPosition commitLogPosition);
+
+    /** Approximate commit log lower bound, <= getCommitLogLowerBound, used as a time stamp for ordering */
+    CommitLogPosition getApproximateCommitLogLowerBound();
+
+    /** The commit log position at the time that this memtable was created */
+    CommitLogPosition getCommitLogLowerBound();
+
+    /** The commit log position at the time that this memtable was switched out */
+    CommitLogPosition getCommitLogUpperBound();
+
+    /** True if the memtable can contain any data that was written before the given commit log position */
+    boolean mayContainDataBefore(CommitLogPosition position);
+
+    /** True if the memtable contains no data */
+    boolean isClean();
+
+    /** Order memtables by time as reflected in the commit log position at time of construction */
+    default int compareTo(Memtable that)
+    {
+        return this.getApproximateCommitLogLowerBound().compareTo(that.getApproximateCommitLogLowerBound());
+    }
+
+    /**
+     * Decides whether the memtable should be switched/flushed for the passed reason.
+     * Normally this will return true, but e.g. persistent memtables may choose not to flush. Returning false will
+     * trigger further action for some reasons:
+     * - SCHEMA_CHANGE will be followed by metadataUpdated().
+     * - OWNED_RANGES_CHANGE will be followed by localRangesUpdated().
+     * - SNAPSHOT will be followed by performSnapshot().
+     * - STREAMING/REPAIR will be followed by creating a FlushSet for the streamed/repaired ranges. This data will be
+     *   used to create sstables, which will be streamed and then deleted.
+     * This will not be called if the sstable is switched because of truncation or drop.
+     */
+    boolean shouldSwitch(ColumnFamilyStore.FlushReason reason);
+
+    /**
+     * Called when the table's metadata is updated. The memtable's metadata reference now points to the new version.
+     * This will not be called if shouldSwitch(SCHEMA_CHANGE) returns true, the memtable will be swapped out instead.
+     */
+    void metadataUpdated();
+
+    /**
+     * Called when the known ranges have been updated and owner.localRangeSplits() may return different values.
+     * This will not be called if shouldSwitch(OWNED_RANGES_CHANGE) returns true, the memtable will be swapped out
+     * instead.
+     * TODO: Implement call.

Review Comment:
   This is actually called from `ColumnFamilyStore#invalidateLocalRanges`, what call this refers to? Is it the missed call to `owner.localRangeSplits` in the implementation of the method?



##########
src/java/org/apache/cassandra/db/memtable/Memtable.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables.
+ * They are of several types:
+ * - construction factory interface
+ * - write and read operations: put, getPartition and makePartitionIterator
+ * - statistics and features, including partition counts, data size, encoding stats, written columns
+ * - memory usage tracking, including methods of retrieval and of adding extra allocated space (used non-CFS secondary
+ *   indexes)
+ * - flush functionality, preparing the set of partitions to flush for given ranges
+ * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together
+ *   with ways of tracking the affected commit log spans
+ */
+public interface Memtable extends Comparable<Memtable>, UnfilteredSource
+{
+    // Construction
+
+    /**
+     * Factory interface for constructing memtables, and querying write durability features.
+     *
+     * The factory is chosen using the MemtableParams class (passed as argument to
+     * {@code CREATE TABLE ... WITH memtable = {...}} or in the memtable options in cassandra.yaml). To make that
+     * possible, implementations must provide either a static {@code FACTORY} field (if they accept no further option)
+     * or a static {@code factory(Map<String, String>)} method. In the latter case, the method should avoid creating
+     * multiple instances of the factory for the same parameters, or factories should at least implement hashCode and
+     * equals.
+     */
+    interface Factory
+    {
+        /**
+         * Create a memtable.
+         *
+         * @param commitLogLowerBound A commit log lower bound for the new memtable. This will be equal to the previous
+         *                            memtable's upper bound and defines the span of positions that any flushed sstable
+         *                            will cover.
+         * @param metadaRef Pointer to the up-to-date table metadata.
+         * @param owner Owning objects that will receive flush requests triggered by the memtable (e.g. on expiration).
+         */
+        Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadaRef, Owner owner);
+
+        /**
+         * If the memtable can achieve write durability directly (i.e. using some feature other than the commitlog, e.g.
+         * persistent memory), it can return true here, in which case the commit log will not store mutations in this
+         * table.
+         * Note that doing so will prevent point-in-time restores and changed data capture, thus a durable memtable must
+         * allow the option of turning commit log writing on even if it does not need it.
+         */
+        default boolean writesShouldSkipCommitLog()
+        {
+            return false;
+        }
+
+        /**
+         * This should be true if the memtable can achieve write durability for crash recovery directly (i.e. using some
+         * feature other than the commitlog, e.g. persistent memory).
+         * Setting this flag to true means that the commitlog should not replay mutations for this table on restart,
+         * and that it should not try to preserve segments that contain relevant data.
+         * Unless writesShouldSkipCommitLog() is also true, writes will be recorded in the commit log as they may be
+         * needed for changed data capture or point-in-time restore.
+         */
+        default boolean writesAreDurable()
+        {
+            return false;
+        }
+
+        /**
+         * Normally we can receive streamed sstables directly, skipping the memtable stage (zero-copy-streaming). When
+         * the memtable is the primary data store (e.g. persistent memtables), it will usually prefer to receive the
+         * data instead.
+         *
+         * If this returns true, all streamed sstables's content will be read and replayed as mutations, disabling
+         * zero-copy streaming.
+         */
+        default boolean streamToMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * When we need to stream data, we usually flush and stream the resulting sstables. This will not work correctly
+         * if the memtable does not want to flush for streaming (e.g. persistent memtables acting as primary data
+         * store), because data (not just recent) will be missing from the streamed view. Such memtables must present
+         * their data separately for streaming.
+         * In other words if the memtable returns false on shouldSwitch(STREAMING/REPAIR), its factory must return true
+         * here.
+         *
+         * If this flag returns true, streaming will write the relevant content that resides in the memtable to
+         * temporary sstables, stream these sstables and then delete them.
+         */
+        default boolean streamFromMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * Override this method to include implementation-specific memtable metrics in the table metrics.
+         *
+         * Memtable metrics lifecycle matches table lifecycle. It is the table that owns the metrics and
+         * decides when to release them.
+         */
+        default TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef)
+        {
+            return null;
+        }
+    }
+
+    /**
+     * Interface for providing signals back to the owner.
+     */
+    interface Owner
+    {
+        /** Signal to the owner that a flush is required (e.g. in response to hitting space limits) */
+        ListenableFuture<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason);
+
+        /** Get the current memtable for this owner. Used to avoid capturing memtable in scheduled flush tasks. */
+        Memtable getCurrentMemtable();
+
+        /**
+         * Collect the index memtables flushed together with this. Used to accurately calculate memory that would be
+         * freed by a flush.
+         */
+        Iterable<Memtable> getIndexMemtables();
+
+        /**
+         * Construct a list of boundaries that split the locally-owned ranges into the given number of shards,
+         * splitting the owned space evenly. It is up to the memtable to use this information.
+         * Any changes in the ring structure (e.g. added or removed nodes) will invalidate the splits; in such a case
+         * the memtable will be sent a shouldSwitch(OWNED_RANGES_CHANGE) and, should that return false, a
+         * localRangesChanged() call.
+         */
+        ShardBoundaries localRangeSplits(int shardCount);

Review Comment:
   No one is calling this method.



##########
src/java/org/apache/cassandra/db/ColumnFamilyStore.java:
##########
@@ -2382,7 +2500,7 @@ public void run()
     /**
      * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
      */
-    public Future<CommitLogPosition> dumpMemtable()
+    public Future<CommitLogPosition> dumpMemtable(FlushReason reason)

Review Comment:
   The reason argument is not used



##########
src/java/org/apache/cassandra/db/memtable/Memtable.java:
##########
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.memtable;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.commitlog.CommitLogPosition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.UnfilteredSource;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Memtable interface. This defines the operations the ColumnFamilyStore can perform with memtables.
+ * They are of several types:
+ * - construction factory interface
+ * - write and read operations: put, getPartition and makePartitionIterator
+ * - statistics and features, including partition counts, data size, encoding stats, written columns
+ * - memory usage tracking, including methods of retrieval and of adding extra allocated space (used non-CFS secondary
+ *   indexes)
+ * - flush functionality, preparing the set of partitions to flush for given ranges
+ * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together
+ *   with ways of tracking the affected commit log spans
+ */
+public interface Memtable extends Comparable<Memtable>, UnfilteredSource
+{
+    // Construction
+
+    /**
+     * Factory interface for constructing memtables, and querying write durability features.
+     *
+     * The factory is chosen using the MemtableParams class (passed as argument to
+     * {@code CREATE TABLE ... WITH memtable = {...}} or in the memtable options in cassandra.yaml). To make that
+     * possible, implementations must provide either a static {@code FACTORY} field (if they accept no further option)
+     * or a static {@code factory(Map<String, String>)} method. In the latter case, the method should avoid creating
+     * multiple instances of the factory for the same parameters, or factories should at least implement hashCode and
+     * equals.
+     */
+    interface Factory
+    {
+        /**
+         * Create a memtable.
+         *
+         * @param commitLogLowerBound A commit log lower bound for the new memtable. This will be equal to the previous
+         *                            memtable's upper bound and defines the span of positions that any flushed sstable
+         *                            will cover.
+         * @param metadaRef Pointer to the up-to-date table metadata.
+         * @param owner Owning objects that will receive flush requests triggered by the memtable (e.g. on expiration).
+         */
+        Memtable create(AtomicReference<CommitLogPosition> commitLogLowerBound, TableMetadataRef metadaRef, Owner owner);
+
+        /**
+         * If the memtable can achieve write durability directly (i.e. using some feature other than the commitlog, e.g.
+         * persistent memory), it can return true here, in which case the commit log will not store mutations in this
+         * table.
+         * Note that doing so will prevent point-in-time restores and changed data capture, thus a durable memtable must
+         * allow the option of turning commit log writing on even if it does not need it.
+         */
+        default boolean writesShouldSkipCommitLog()
+        {
+            return false;
+        }
+
+        /**
+         * This should be true if the memtable can achieve write durability for crash recovery directly (i.e. using some
+         * feature other than the commitlog, e.g. persistent memory).
+         * Setting this flag to true means that the commitlog should not replay mutations for this table on restart,
+         * and that it should not try to preserve segments that contain relevant data.
+         * Unless writesShouldSkipCommitLog() is also true, writes will be recorded in the commit log as they may be
+         * needed for changed data capture or point-in-time restore.
+         */
+        default boolean writesAreDurable()
+        {
+            return false;
+        }
+
+        /**
+         * Normally we can receive streamed sstables directly, skipping the memtable stage (zero-copy-streaming). When
+         * the memtable is the primary data store (e.g. persistent memtables), it will usually prefer to receive the
+         * data instead.
+         *
+         * If this returns true, all streamed sstables's content will be read and replayed as mutations, disabling
+         * zero-copy streaming.
+         */
+        default boolean streamToMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * When we need to stream data, we usually flush and stream the resulting sstables. This will not work correctly
+         * if the memtable does not want to flush for streaming (e.g. persistent memtables acting as primary data
+         * store), because data (not just recent) will be missing from the streamed view. Such memtables must present
+         * their data separately for streaming.
+         * In other words if the memtable returns false on shouldSwitch(STREAMING/REPAIR), its factory must return true
+         * here.
+         *
+         * If this flag returns true, streaming will write the relevant content that resides in the memtable to
+         * temporary sstables, stream these sstables and then delete them.
+         */
+        default boolean streamFromMemtable()
+        {
+            return false;
+        }
+
+        /**
+         * Override this method to include implementation-specific memtable metrics in the table metrics.
+         *
+         * Memtable metrics lifecycle matches table lifecycle. It is the table that owns the metrics and
+         * decides when to release them.
+         */
+        default TableMetrics.ReleasableMetric createMemtableMetrics(TableMetadataRef metadataRef)
+        {
+            return null;
+        }
+    }
+
+    /**
+     * Interface for providing signals back to the owner.
+     */
+    interface Owner
+    {
+        /** Signal to the owner that a flush is required (e.g. in response to hitting space limits) */
+        ListenableFuture<CommitLogPosition> signalFlushRequired(Memtable memtable, ColumnFamilyStore.FlushReason reason);
+
+        /** Get the current memtable for this owner. Used to avoid capturing memtable in scheduled flush tasks. */
+        Memtable getCurrentMemtable();
+
+        /**
+         * Collect the index memtables flushed together with this. Used to accurately calculate memory that would be
+         * freed by a flush.
+         */
+        Iterable<Memtable> getIndexMemtables();
+
+        /**
+         * Construct a list of boundaries that split the locally-owned ranges into the given number of shards,
+         * splitting the owned space evenly. It is up to the memtable to use this information.
+         * Any changes in the ring structure (e.g. added or removed nodes) will invalidate the splits; in such a case
+         * the memtable will be sent a shouldSwitch(OWNED_RANGES_CHANGE) and, should that return false, a
+         * localRangesChanged() call.
+         */
+        ShardBoundaries localRangeSplits(int shardCount);
+    }
+
+    // Main write and read operations
+
+    /**
+     * Put new data in the memtable. This operation may block until enough memory is available in the memory pool.
+     *
+     * @param update the partition update, may be a new partition or an update to an existing one
+     * @param indexer receives information about the update's effect
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     *
+     * @return the smallest timestamp delta between corresponding rows from existing and update. A
+     * timestamp delta being computed as the difference between the cells and DeletionTimes from any existing partition
+     * and those in {@code update}. See CASSANDRA-7979.
+     */
+    long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup);
+
+    // Read operations are provided by the UnfilteredSource interface.
+
+    // Statistics
+
+    /** Number of partitions stored in the memtable */
+    long partitionCount();
+
+    /** Size of the data not accounting for any metadata / mapping overheads */
+    long getLiveDataSize();
+
+    /**
+     * Number of "operations" (in the sense defined in {@link PartitionUpdate#operationCount()}) the memtable has
+     * executed.
+     */
+    long getOperations();
+
+    /**
+     * The table's definition metadata.
+     *
+     * Note that this tracks the current state of the table and is not necessarily the same as what was used to create
+     * the memtable.
+     */
+    TableMetadata metadata();
+
+
+    // Memory usage tracking
+
+    /**
+     * Add this memtable's used memory to the given usage object. This can be used to retrieve a single memtable's usage
+     * as well as to combine the ones of related sstables (e.g. a table and its table-based secondary indexes).
+     */
+    void addMemoryUsageTo(MemoryUsage usage);
+
+
+    /**
+     * Creates a holder for memory usage collection.
+     *
+     * This is used to track on- and off-heap memory, as well as the ratio to the total permitted memtable memory.
+     */
+    static MemoryUsage newMemoryUsage()
+    {
+        return new MemoryUsage();
+    }
+
+    /**
+     * Shorthand for the getting a given table's memory usage.
+     * Implemented as a static to prevent implementations altering expectations by e.g. returning a cached object.
+     */
+    static MemoryUsage getMemoryUsage(Memtable memtable)
+    {
+        MemoryUsage usage = newMemoryUsage();
+        memtable.addMemoryUsageTo(usage);
+        return usage;
+    }
+
+    class MemoryUsage
+    {
+        /** On-heap memory used in bytes */
+        public long ownsOnHeap = 0;
+        /** Off-heap memory used in bytes */
+        public long ownsOffHeap = 0;
+        /** On-heap memory as ratio to permitted memtable space */
+        public float ownershipRatioOnHeap = 0.0f;
+        /** Off-heap memory as ratio to permitted memtable space */
+        public float ownershipRatioOffHeap = 0.0f;
+
+        public String toString()
+        {
+            return String.format("%s (%.0f%%) on-heap, %s (%.0f%%) off-heap",
+                                 FBUtilities.prettyPrintMemory(ownsOnHeap),
+                                 ownershipRatioOnHeap * 100,
+                                 FBUtilities.prettyPrintMemory(ownsOffHeap),
+                                 ownershipRatioOffHeap * 100);
+        }
+    }
+
+    /**
+     * Adjust the used on-heap space by the given size (e.g. to reflect memory used by a non-table-based index).
+     * This operation may block until enough memory is available in the memory pool.
+     *
+     * @param additionalSpace the number of allocated bytes
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     */
+    void markExtraOnHeapUsed(long additionalSpace, OpOrder.Group opGroup);
+
+    /**
+     * Adjust the used off-heap space by the given size (e.g. to reflect memory used by a non-table-based index).
+     * This operation may block until enough memory is available in the memory pool.
+     *
+     * @param additionalSpace the number of allocated bytes
+     * @param opGroup write operation group, used to permit the operation to complete if it is needed to complete a
+     *                flush to free space.
+     */
+    void markExtraOffHeapUsed(long additionalSpace, OpOrder.Group opGroup);
+
+
+    // Flushing
+
+    /**
+     * Get the collection of data between the given partition boundaries in a form suitable for flushing.
+     */
+    FlushCollection<?> getFlushSet(PartitionPosition from, PartitionPosition to);
+
+    /**
+     * A collection of partitions for flushing plus some information required for writing an sstable.
+     *
+     * Note that the listed entries must conform with the specified metadata. In particular, if the memtable is still
+     * being written to, care must be taken to not list newer items as they may violate the bounds collected by the
+     * encoding stats or refer to columns that don't exist in the collected columns set.
+     */
+    interface FlushCollection<P extends Partition> extends Iterable<P>, SSTableWriter.SSTableSizeParameters
+    {
+        Memtable memtable();
+
+        PartitionPosition from();
+        PartitionPosition to();
+
+        /** The commit log position at the time that this memtable was created */
+        CommitLogPosition commitLogLowerBound();
+        /** The commit log position at the time that this memtable was switched out */
+        CommitLogPosition commitLogUpperBound();
+
+        /** The set of all columns that have been written */
+        RegularAndStaticColumns columns();
+        /** Statistics required for writing an sstable efficiently */
+        EncodingStats encodingStats();
+
+        default TableMetadata metadata()
+        {
+            return memtable().metadata();
+        }
+
+        long partitionCount();
+        default boolean isEmpty()
+        {
+            return partitionCount() > 0;
+        }
+    }
+
+
+    // Lifecycle management
+
+    /**
+     * Called to tell the memtable that it is being switched out and will be flushed (or dropped) and discarded.
+     * Will be followed by a {@link #getFlushSet} call (if the table is not truncated or dropped), and a
+     * {@link #discard}.
+     *
+     * @param writeBarrier The barrier that will signal that all writes to this memtable have completed. That is, the
+     *                     point after which writes cannot be accepted by this memtable (it is permitted for writes
+     *                     before this barrier to go into the next; see {@link #accepts}).
+     * @param commitLogUpperBound The upper commit log position for this memtable. The value may be modified after this
+     *                            call and will match the next memtable's lower commit log bound.
+     */
+    void switchOut(OpOrder.Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound);
+
+    /**
+     * This memtable is no longer in use or required for outstanding flushes or operations.
+     * All held memory must be released.
+     */
+    void discard();
+
+    /**
+     * Decide if this memtable should take a write with the given parameters, or if the write should go to the next
+     * memtable. This enforces that no writes after the barrier set by {@link #switchOut} can be accepted, and
+     * is also used to define a shared commit log bound as the upper for this memtable and lower for the next.
+     */
+    boolean accepts(OpOrder.Group opGroup, CommitLogPosition commitLogPosition);
+
+    /** Approximate commit log lower bound, <= getCommitLogLowerBound, used as a time stamp for ordering */
+    CommitLogPosition getApproximateCommitLogLowerBound();
+
+    /** The commit log position at the time that this memtable was created */
+    CommitLogPosition getCommitLogLowerBound();
+
+    /** The commit log position at the time that this memtable was switched out */
+    CommitLogPosition getCommitLogUpperBound();
+
+    /** True if the memtable can contain any data that was written before the given commit log position */
+    boolean mayContainDataBefore(CommitLogPosition position);
+
+    /** True if the memtable contains no data */
+    boolean isClean();
+
+    /** Order memtables by time as reflected in the commit log position at time of construction */
+    default int compareTo(Memtable that)
+    {
+        return this.getApproximateCommitLogLowerBound().compareTo(that.getApproximateCommitLogLowerBound());
+    }
+
+    /**
+     * Decides whether the memtable should be switched/flushed for the passed reason.
+     * Normally this will return true, but e.g. persistent memtables may choose not to flush. Returning false will
+     * trigger further action for some reasons:
+     * - SCHEMA_CHANGE will be followed by metadataUpdated().
+     * - OWNED_RANGES_CHANGE will be followed by localRangesUpdated().
+     * - SNAPSHOT will be followed by performSnapshot().
+     * - STREAMING/REPAIR will be followed by creating a FlushSet for the streamed/repaired ranges. This data will be
+     *   used to create sstables, which will be streamed and then deleted.
+     * This will not be called if the sstable is switched because of truncation or drop.

Review Comment:
   Indeed this isn't called from `ColumnFamilyStore#dumpMemtable`, but I think it can be called with `TRUNCATE` from `ColumnFamilyStore#truncateBlocking`.



##########
src/java/org/apache/cassandra/schema/MemtableParams.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.schema;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.memtable.SkipListMemtableFactory;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+/**
+ * Memtable types and options are specified with these parameters. Memtable classes must either contain a static FACTORY
+ * field (if they take no arguments other than class), or implement a factory(Map<String, String>) method.
+ *
+ * The latter should consume any further options (using map.remove).
+ *
+ *
+ * CQL: {'class' : 'SkipListMemtable'}
+ */
+public final class MemtableParams
+{
+    public final Memtable.Factory factory;
+    public final ImmutableMap<String, String> options;
+
+    private MemtableParams(Memtable.Factory factory, ImmutableMap<String, String> options)
+    {
+        this.options = options;
+        this.factory = factory;
+    }
+
+    private static Memtable.Factory getMemtableFactory(Map<String, String> options)
+    {
+        Map<String, String> copy = new HashMap<>(options);
+        String className = copy.remove(Option.CLASS.toString());
+        if (className.isEmpty() || className == null)

Review Comment:
   We should check first if `className` is null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org