You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ed...@apache.org on 2021/01/19 19:16:48 UTC

[cassandra] branch trunk updated: Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318

This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 661f1aa  Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318
661f1aa is described below

commit 661f1aab171dc3ef16075f69581e88ad4a133fae
Author: Branimir Lambov <br...@datastax.com>
AuthorDate: Tue Dec 8 15:37:39 2020 +0200

    Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
    Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318
---
 CHANGES.txt                                        |   3 +-
 src/java/org/apache/cassandra/db/Columns.java      |  10 +
 src/java/org/apache/cassandra/db/DeletionTime.java |   3 +
 src/java/org/apache/cassandra/db/LivenessInfo.java |  16 +-
 src/java/org/apache/cassandra/db/Memtable.java     |  33 ++-
 .../apache/cassandra/db/MutableDeletionInfo.java   |   3 +
 .../cassandra/db/RegularAndStaticColumns.java      |  12 +-
 .../db/partitions/AbstractBTreePartition.java      |   2 +
 .../db/partitions/AtomicBTreePartition.java        |   9 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java     |   6 +-
 .../apache/cassandra/db/rows/EncodingStats.java    |  15 +-
 src/java/org/apache/cassandra/db/rows/Row.java     |  12 +-
 test/bin/jmh                                       | 137 ++++++++++++
 test/conf/logback-jmh.xml                          | 101 +++++++++
 .../test/microbench/instance/ReadTest.java         | 233 +++++++++++++++++++++
 .../instance/ReadTestSmallPartitions.java          |  60 ++++++
 .../instance/ReadTestWidePartitions.java           |  96 +++++++++
 .../unit/org/apache/cassandra/ServerTestUtils.java |   5 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   4 +-
 .../apache/cassandra/cql3/MemtableSizeTest.java    | 124 +++++++++++
 20 files changed, 858 insertions(+), 26 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 09b02d5..9a304b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
 4.0-beta5
- * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
- * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
+ * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
  * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
  * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
  * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index aceb868..9e07375 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
@@ -52,6 +53,7 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle
 {
     public static final Serializer serializer = new Serializer();
     public static final Columns NONE = new Columns(BTree.empty(), 0);
+    static final long EMPTY_SIZE = ObjectSizes.measure(NONE);
 
     public static final ColumnMetadata FIRST_COMPLEX_STATIC =
         new ColumnMetadata("",
@@ -406,6 +408,14 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle
         return Objects.hash(complexIdx, BTree.hashCode(columns));
     }
 
+    public long unsharedHeapSize()
+    {
+        if(this == NONE)
+            return 0;
+
+        return EMPTY_SIZE;
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index f1471fd..d8ac91d 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -157,6 +157,9 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
 
     public long unsharedHeapSize()
     {
+        if (this == LIVE)
+            return 0;
+
         return EMPTY_SIZE;
     }
 
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java
index b1ea3f6..f3e6daa 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -19,8 +19,10 @@ package org.apache.cassandra.db;
 
 import java.util.Objects;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ObjectSizes;
 
 /**
  * Stores the information relating to the liveness of the primary key columns of a row.
@@ -35,7 +37,7 @@ import org.apache.cassandra.serializers.MarshalException;
  * unaffected (of course, the rest of said row data might be ttl'ed on its own but this is
  * separate).
  */
-public class LivenessInfo
+public class LivenessInfo implements IMeasurableMemory
 {
     public static final long NO_TIMESTAMP = Long.MIN_VALUE;
     public static final int NO_TTL = Cell.NO_TTL;
@@ -49,6 +51,7 @@ public class LivenessInfo
     public static final int NO_EXPIRATION_TIME = Cell.NO_DELETION_TIME;
 
     public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
+    private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);
 
     protected final long timestamp;
 
@@ -255,6 +258,11 @@ public class LivenessInfo
         return Objects.hash(timestamp(), ttl(), localExpirationTime());
     }
 
+    public long unsharedHeapSize()
+    {
+        return this == EMPTY ? 0 : UNSHARED_HEAP_SIZE;
+    }
+
     /**
      * Effectively acts as a PK tombstone. This is used for Materialized Views to shadow
      * updated entries while co-existing with row tombstones.
@@ -294,6 +302,7 @@ public class LivenessInfo
     {
         private final int ttl;
         private final int localExpirationTime;
+        private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(new ExpiringLivenessInfo(-1, -1, -1));
 
         private ExpiringLivenessInfo(long timestamp, int ttl, int localExpirationTime)
         {
@@ -364,5 +373,10 @@ public class LivenessInfo
         {
             return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, localExpirationTime);
         }
+
+        public long unsharedHeapSize()
+        {
+            return UNSHARED_HEAP_SIZE;
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index c969616..cdbe163 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -17,38 +17,52 @@
  */
 package org.apache.cassandra.db;
 
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.AtomicBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -529,6 +543,7 @@ public class Memtable implements Comparable<Memtable>
             rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
             rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
             rowOverhead += AtomicBTreePartition.EMPTY_SIZE;
+            rowOverhead += AbstractBTreePartition.HOLDER_UNSHARED_HEAP_SIZE;
             allocator.setDiscarding();
             allocator.setDiscarded();
             return rowOverhead;
diff --git a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
index 8544b78..bfe4d4c 100644
--- a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
@@ -248,6 +248,9 @@ public class MutableDeletionInfo implements DeletionInfo
     @Override
     public long unsharedHeapSize()
     {
+        if (this == LIVE)
+            return 0;
+
         return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
     }
 
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
index fab7730..1501345 100644
--- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -22,6 +22,7 @@ import java.util.*;
 import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static java.util.Comparator.naturalOrder;
@@ -33,6 +34,7 @@ import static java.util.Comparator.naturalOrder;
 public class RegularAndStaticColumns implements Iterable<ColumnMetadata>
 {
     public static RegularAndStaticColumns NONE = new RegularAndStaticColumns(Columns.NONE, Columns.NONE);
+    static final long EMPTY_SIZE = ObjectSizes.measure(NONE);
 
     public final Columns statics;
     public final Columns regulars;
@@ -105,11 +107,19 @@ public class RegularAndStaticColumns implements Iterable<ColumnMetadata>
         return regulars.size() + statics.size();
     }
 
+    public long unsharedHeapSize()
+    {
+        if(this == NONE)
+            return 0;
+
+        return EMPTY_SIZE + regulars.unsharedHeapSize() + statics.unsharedHeapSize();
+    }
+
     @Override
     public String toString()
     {
         StringBuilder sb = new StringBuilder();
-        sb.append("[").append(statics).append(" | ").append(regulars).append("]");
+        sb.append('[').append(statics).append(" | ").append(regulars).append(']');
         return sb.toString();
     }
 
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 44dc0b0..1d6603e 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 
@@ -36,6 +37,7 @@ import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
 {
     protected static final Holder EMPTY = new Holder(RegularAndStaticColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+    public static final long HOLDER_UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);
 
     protected final DecoratedKey partitionKey;
 
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index ed635cd..801d9e2 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -135,12 +135,14 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
         }
 
         RegularAndStaticColumns columns = update.columns().mergeTo(current.columns);
+        updater.allocated(columns.unsharedHeapSize() - current.columns.unsharedHeapSize());
         Row newStatic = update.staticRow();
         Row staticRow = newStatic.isEmpty()
                         ? current.staticRow
                         : (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
         Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
         EncodingStats newStats = current.stats.mergeWith(update.stats());
+        updater.allocated(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize());
 
         if (tree != null && refUpdater.compareAndSet(this, current, new Holder(columns, tree, deletionInfo, staticRow, newStats)))
         {
@@ -274,8 +276,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
         if (!writeOp.isOldestLiveGroup())
         {
             Thread.yield();
-            if (!writeOp.isOldestLiveGroup())
-                return false;
+            return writeOp.isOldestLiveGroup();
         }
 
         return true;
@@ -370,7 +371,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
             indexer.onInserted(insert);
 
             this.dataSize += data.dataSize();
-            this.heapSize += data.unsharedHeapSizeExcludingData();
+            allocated(data.unsharedHeapSizeExcludingData());
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(data);
@@ -387,7 +388,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
             indexer.onUpdated(existing, reconciled);
 
             dataSize += reconciled.dataSize() - existing.dataSize();
-            heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
+            allocated(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData());
             if (inserted == null)
                 inserted = new ArrayList<>();
             inserted.add(reconciled);
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index e8476dd..b971307 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -494,8 +494,10 @@ public class BTreeRow extends AbstractRow
     public long unsharedHeapSizeExcludingData()
     {
         long heapSize = EMPTY_SIZE
-                      + clustering.unsharedHeapSizeExcludingData()
-                      + BTree.sizeOfStructureOnHeap(btree);
+                        + clustering.unsharedHeapSizeExcludingData()
+                        + primaryKeyLivenessInfo.unsharedHeapSize()
+                        + deletion.unsharedHeapSize()
+                        + BTree.sizeOfStructureOnHeap(btree);
 
         return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), heapSize);
     }
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index f2fb340..37dd34e 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -21,12 +21,12 @@ import java.io.IOException;
 import java.util.*;
 import java.util.function.Function;
 
-import com.google.common.collect.Iterables;
-
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ObjectSizes;
 
 /**
  * Stats used for the encoding of the rows and tombstones of a given source.
@@ -40,13 +40,14 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * this shouldn't have too huge an impact on performance) and in fact they will not always be
  * accurate for reasons explained in {@link SerializationHeader#make}.
  */
-public class EncodingStats
+public class EncodingStats implements IMeasurableMemory
 {
     // Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize
     // an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding.
     public static final long TIMESTAMP_EPOCH;
     private static final int DELETION_TIME_EPOCH;
     private static final int TTL_EPOCH = 0;
+
     static
     {
         // We want a fixed epoch, but that provide small values when substracted from our timestamp and deletion time.
@@ -66,6 +67,7 @@ public class EncodingStats
 
     // We should use this sparingly obviously
     public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH);
+    public static long HEAP_SIZE = ObjectSizes.measure(NO_STATS);
 
     public static final Serializer serializer = new Serializer();
 
@@ -152,6 +154,13 @@ public class EncodingStats
         return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL);
     }
 
+    public long unsharedHeapSize()
+    {
+        if (this == NO_STATS)
+            return 0;
+        return HEAP_SIZE;
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index ec43783..5c28cd1 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.utils.BiLongAccumulator;
 import org.apache.cassandra.utils.LongAccumulator;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -323,6 +324,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
     public static class Deletion
     {
         public static final Deletion LIVE = new Deletion(DeletionTime.LIVE, false);
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
 
         private final DeletionTime time;
         private final boolean isShadowable;
@@ -422,6 +424,14 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
             return this.time.equals(that.time) && this.isShadowable == that.isShadowable;
         }
 
+        public long unsharedHeapSize()
+        {
+            if(this == LIVE)
+                return 0;
+
+            return EMPTY_SIZE + time().unsharedHeapSize();
+        }
+
         @Override
         public final int hashCode()
         {
@@ -722,7 +732,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
             // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
             return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
                  ? null
-                 : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
+                 : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.noOp()));
         }
 
         public Clustering<?> mergedClustering()
diff --git a/test/bin/jmh b/test/bin/jmh
new file mode 100755
index 0000000..c56145f
--- /dev/null
+++ b/test/bin/jmh
@@ -0,0 +1,137 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+jvmoptions_variant="-server"
+export CASSANDRA_HOME=`dirname "$0"`/../../
+. $CASSANDRA_HOME/bin/cassandra.in.sh
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -n "$JAVA_HOME" ]; then
+    # Why we can't have nice things: Solaris combines x86 and x86_64
+    # installations in the same tree, using an unconventional path for the
+    # 64bit JVM.  Since we prefer 64bit, search the alternate path first,
+    # (see https://issues.apache.org/jira/browse/CASSANDRA-4638).
+    for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+        if [ -x "$java" ]; then
+            JAVA="$java"
+            break
+        fi
+    done
+else
+    JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+    echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. >&2
+    exit 1;
+fi
+
+# If numactl is available, use it. For Cassandra, the priority is to
+# avoid disk I/O. Even for the purpose of CPU efficiency, we don't
+# really have CPU<->data affinity anyway. Also, empirically test that numactl
+# works before trying to use it (CASSANDRA-3245).
+NUMACTL_ARGS=${NUMACTL_ARGS:-"--localalloc"}
+if which numactl >/dev/null 2>/dev/null && numactl $NUMACTL_ARGS ls / >/dev/null 2>/dev/null
+then
+    NUMACTL="numactl $NUMACTL_ARGS"
+else
+    NUMACTL=""
+fi
+
+if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
+    echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+    exit 1
+fi
+
+if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then
+    . "$CASSANDRA_CONF/cassandra-env.sh"
+fi
+
+# Special-case path variables.
+case "`uname`" in
+    CYGWIN*)
+        CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+        CASSANDRA_CONF=`cygpath -p -w "$CASSANDRA_CONF"`
+    ;;
+esac
+
+# Cassandra uses an installed jemalloc via LD_PRELOAD / DYLD_INSERT_LIBRARIES by default to improve off-heap
+# memory allocation performance. The following code searches for an installed libjemalloc.dylib/.so/.1.so using
+# Linux and OS-X specific approaches.
+# To specify your own libjemalloc in a different path, configure the fully qualified path in CASSANDRA_LIBJEMALLOC.
+# To disable jemalloc preload at all, set CASSANDRA_LIBJEMALLOC=-
+#
+#CASSANDRA_LIBJEMALLOC=
+#
+find_library()
+{
+    pattern=$1
+    path=$(echo ${2} | tr ":" " ")
+
+    find $path -regex "$pattern" -print 2>/dev/null | head -n 1
+}
+case "`uname -s`" in
+    Linux)
+        if [ -z $CASSANDRA_LIBJEMALLOC ] ; then
+            which ldconfig > /dev/null 2>&1
+            if [ $? = 0 ] ; then
+                # e.g. for CentOS
+                dirs="/lib64 /lib /usr/lib64 /usr/lib `ldconfig -v 2>/dev/null | grep -v '^\s' | sed 's/^\([^:]*\):.*$/\1/'`"
+            else
+                # e.g. for Debian, OpenSUSE
+                dirs="/lib64 /lib /usr/lib64 /usr/lib `cat /etc/ld.so.conf /etc/ld.so.conf.d/*.conf | grep '^/'`"
+            fi
+            dirs=`echo $dirs | tr " " ":"`
+            CASSANDRA_LIBJEMALLOC=$(find_library '.*/libjemalloc\.so\(\.1\)*' $dirs)
+        fi
+        if [ ! -z $CASSANDRA_LIBJEMALLOC ] ; then
+            export JVM_OPTS="$JVM_OPTS -Dcassandra.libjemalloc=$CASSANDRA_LIBJEMALLOC"
+            if [ "-" != "$CASSANDRA_LIBJEMALLOC" ] ; then
+                export LD_PRELOAD=$CASSANDRA_LIBJEMALLOC
+            fi
+        fi
+    ;;
+    Darwin)
+        if [ -z $CASSANDRA_LIBJEMALLOC ] ; then
+            CASSANDRA_LIBJEMALLOC=$(find_library '.*/libjemalloc\.dylib' $DYLD_LIBRARY_PATH:${DYLD_FALLBACK_LIBRARY_PATH-$HOME/lib:/usr/local/lib:/lib:/usr/lib})
+        fi
+        if [ ! -z $CASSANDRA_LIBJEMALLOC ] ; then
+            export JVM_OPTS="$JVM_OPTS -Dcassandra.libjemalloc=$CASSANDRA_LIBJEMALLOC"
+            if [ "-" != "$CASSANDRA_LIBJEMALLOC" ] ; then
+                export DYLD_INSERT_LIBRARIES=$CASSANDRA_LIBJEMALLOC
+            fi
+        fi
+    ;;
+esac
+
+cassandra_parms="-Dlogback.configurationFile=$CASSANDRA_HOME/test/conf/logback-jmh.xml"
+cassandra_parms="$cassandra_parms -Dcassandra.logdir=$CASSANDRA_HOME/logs"
+cassandra_parms="$cassandra_parms -Dcassandra.storagedir=$cassandra_storagedir"
+cassandra_parms="$cassandra_parms -Dcassandra-foreground=yes"
+cassandra_parms="$cassandra_parms -XX:+PreserveFramePointer"
+
+# Create log directory, some tests require that
+mkdir -p $CASSANDRA_HOME/logs
+
+if [ ! -f $CASSANDRA_HOME/build/test/benchmarks.jar ] ; then
+    echo "$CASSANDRA_HOME/build/test/benchmarks.jar does not exist - execute 'ant build-jmh'"
+    exit 1
+fi
+
+exec $NUMACTL "$JAVA" -cp "$CLASSPATH:$CASSANDRA_HOME/build/test/benchmarks.jar:$CASSANDRA_HOME/build/test/deps.jar" org.openjdk.jmh.Main -jvmArgs="$cassandra_parms $JVM_OPTS" $@
+
+# vi:ai sw=4 ts=4 tw=0 et
diff --git a/test/conf/logback-jmh.xml b/test/conf/logback-jmh.xml
new file mode 100644
index 0000000..4138f19
--- /dev/null
+++ b/test/conf/logback-jmh.xml
@@ -0,0 +1,101 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!--
+In order to disable debug.log, comment-out the ASYNCDEBUGLOG
+appender reference in the root level section below.
+-->
+
+<configuration scan="false" scanPeriod="60 seconds">
+  <jmxConfigurator />
+  <!-- No shutdown hook; we run it ourselves in StorageService after shutdown -->
+
+  <!-- SYSTEMLOG rolling file appender to system.log (INFO level) -->
+
+  <appender name="SYSTEMLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>INFO</level>
+    </filter>
+    <file>${cassandra.logdir}/system.log</file>
+    <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+      <!-- rollover daily -->
+      <fileNamePattern>${cassandra.logdir}/system.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+      <!-- each file should be at most 50MB, keep 7 days worth of history, but at most 5GB -->
+      <maxFileSize>50MB</maxFileSize>
+      <maxHistory>7</maxHistory>
+      <totalSizeCap>5GB</totalSizeCap>
+    </rollingPolicy>
+    <encoder>
+      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <!-- DEBUGLOG rolling file appender to debug.log (all levels) -->
+
+  <appender name="DEBUGLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+    <file>${cassandra.logdir}/debug.log</file>
+    <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+      <!-- rollover daily -->
+      <fileNamePattern>${cassandra.logdir}/debug.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+      <!-- each file should be at most 50MB, keep 7 days worth of history, but at most 5GB -->
+      <maxFileSize>50MB</maxFileSize>
+      <maxHistory>7</maxHistory>
+      <totalSizeCap>5GB</totalSizeCap>
+    </rollingPolicy>
+    <encoder>
+      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <!-- ASYNCLOG assynchronous appender to debug.log (all levels) -->
+  <appender name="ASYNCDEBUGLOG" class="ch.qos.logback.classic.AsyncAppender">
+    <queueSize>1024</queueSize>
+    <discardingThreshold>0</discardingThreshold>
+    <includeCallerData>true</includeCallerData>
+    <appender-ref ref="DEBUGLOG" />
+  </appender>
+
+  <!-- NOTE: For JMH we disable the STDOUT logging to avoid noise in output -->
+  <!-- STDOUT console appender to stdout (INFO level)
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+      <level>INFO</level>
+    </filter>
+    <encoder>
+      <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+    </encoder>
+  </appender-->
+
+  <!-- Uncomment bellow and corresponding appender-ref to activate logback metrics
+  <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
+   -->
+
+  <root level="INFO">
+    <appender-ref ref="SYSTEMLOG" />
+    <!-- appender-ref ref="STDOUT" /-->
+    <appender-ref ref="ASYNCDEBUGLOG" /> <!-- Comment this line to disable debug.log -->
+    <!--
+    <appender-ref ref="LogbackMetrics" />
+    -->
+  </root>
+
+  <logger name="org.apache.cassandra" level="DEBUG"/>
+  <logger name="com.datastax.bdp.db" level="DEBUG"/>
+</configuration>
diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java
new file mode 100644
index 0000000..789ca00
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench.instance;
+
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.*;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 15, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+@State(Scope.Benchmark)
+public abstract class ReadTest extends CQLTester
+{
+    static String keyspace;
+    String table;
+    ColumnFamilyStore cfs;
+    Random rand;
+
+    @Param({"1000"})
+    int BATCH = 1_000;
+
+    public enum Flush
+    {
+        INMEM, NO, YES
+    }
+
+    @Param({"1000000"})
+    int count = 1_000_000;
+
+    @Param({"INMEM", "YES"})
+    Flush flush = Flush.INMEM;
+
+    public enum Execution
+    {
+        SERIAL,
+        SERIAL_NET,
+        PARALLEL,
+        PARALLEL_NET,
+    }
+
+    @Param({"PARALLEL"})
+    Execution async = Execution.PARALLEL;
+
+    @Setup(Level.Trial)
+    public void setup() throws Throwable
+    {
+        rand = new Random(1);
+        CQLTester.setUpClass();
+        CQLTester.prepareServer();
+        System.err.println("setupClass done.");
+        keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
+        table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid)) with compression = {'enabled': false}");
+        execute("use "+keyspace+";");
+        switch (async)
+        {
+            case SERIAL_NET:
+            case PARALLEL_NET:
+                CQLTester.requireNetwork();
+                executeNet(getDefaultVersion(), "use " + keyspace + ";");
+        }
+        String writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)";
+        System.err.println("Prepared, batch " + BATCH + " flush " + flush);
+        System.err.println("Disk access mode " + DatabaseDescriptor.getDiskAccessMode() + " index " + DatabaseDescriptor.getIndexAccessMode());
+
+        cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        cfs.disableAutoCompaction();
+        cfs.forceBlockingFlush();
+
+        //Warm up
+        System.err.println("Writing " + count);
+        long i;
+        for (i = 0; i <= count - BATCH; i += BATCH)
+            performWrite(writeStatement, i, BATCH);
+        if (i < count)
+            performWrite(writeStatement, i, count - i);
+
+        Memtable memtable = cfs.getTracker().getView().getCurrentMemtable();
+        System.err.format("Memtable in %s mode: %d ops, %s serialized bytes, %s (%.0f%%) on heap, %s (%.0f%%) off-heap\n",
+                          DatabaseDescriptor.getMemtableAllocationType(),
+                          memtable.getOperations(),
+                          FBUtilities.prettyPrintMemory(memtable.getLiveDataSize()),
+                          FBUtilities.prettyPrintMemory(memtable.getAllocator().onHeap().owns()),
+                          100 * memtable.getAllocator().onHeap().ownershipRatio(),
+                          FBUtilities.prettyPrintMemory(memtable.getAllocator().offHeap().owns()),
+                          100 * memtable.getAllocator().offHeap().ownershipRatio());
+
+        switch (flush)
+        {
+        case YES:
+            cfs.forceBlockingFlush();
+            break;
+        case INMEM:
+            if (!cfs.getLiveSSTables().isEmpty())
+                throw new AssertionError("SSTables created for INMEM test.");
+        default:
+            // don't flush
+        }
+
+        // Needed to stabilize sstable count for off-cache sized tests (e.g. count = 100_000_000)
+        while (cfs.getLiveSSTables().size() >= 15)
+        {
+            cfs.enableAutoCompaction(true);
+            cfs.disableAutoCompaction();
+        }
+    }
+
+    abstract Object[] writeArguments(long i);
+
+    public void performWrite(String writeStatement, long ofs, long count) throws Throwable
+    {
+        for (long i = ofs; i < ofs + count; ++i)
+            execute(writeStatement, writeArguments(i));
+    }
+
+
+    @TearDown(Level.Trial)
+    public void teardown() throws InterruptedException
+    {
+        if (flush == Flush.INMEM && !cfs.getLiveSSTables().isEmpty())
+            throw new AssertionError("SSTables created for INMEM test.");
+
+        // do a flush to print sizes
+        cfs.forceBlockingFlush();
+
+        CommitLog.instance.shutdownBlocking();
+        CQLTester.tearDownClass();
+        CQLTester.cleanup();
+    }
+
+    public Object performReadSerial(String readStatement, Supplier<Object[]> supplier) throws Throwable
+    {
+        long sum = 0;
+        for (int i = 0; i < BATCH; ++i)
+            sum += execute(readStatement, supplier.get()).size();
+        return sum;
+    }
+
+    public Object performReadThreads(String readStatement, Supplier<Object[]> supplier) throws Throwable
+    {
+        return IntStream.range(0, BATCH)
+                        .parallel()
+                        .mapToLong(i ->
+                                   {
+                                       try
+                                       {
+                                           return execute(readStatement, supplier.get()).size();
+                                       }
+                                       catch (Throwable throwable)
+                                       {
+                                           throw Throwables.propagate(throwable);
+                                       }
+                                   })
+                        .sum();
+    }
+
+    public Object performReadSerialNet(String readStatement, Supplier<Object[]> supplier) throws Throwable
+    {
+        long sum = 0;
+        for (int i = 0; i < BATCH; ++i)
+            sum += executeNet(getDefaultVersion(), readStatement, supplier.get())
+                           .getAvailableWithoutFetching();
+        return sum;
+    }
+
+    public long performReadThreadsNet(String readStatement, Supplier<Object[]> supplier) throws Throwable
+    {
+        return IntStream.range(0, BATCH)
+                        .parallel()
+                        .mapToLong(i ->
+                                   {
+                                       try
+                                       {
+                                           return executeNet(getDefaultVersion(), readStatement, supplier.get())
+                                                          .getAvailableWithoutFetching();
+                                       }
+                                       catch (Throwable throwable)
+                                       {
+                                           throw Throwables.propagate(throwable);
+                                       }
+                                   })
+                        .sum();
+    }
+
+
+    public Object performRead(String readStatement, Supplier<Object[]> supplier) throws Throwable
+    {
+        switch (async)
+        {
+            case SERIAL:
+                return performReadSerial(readStatement, supplier);
+            case SERIAL_NET:
+                return performReadSerialNet(readStatement, supplier);
+            case PARALLEL:
+                return performReadThreads(readStatement, supplier);
+            case PARALLEL_NET:
+                return performReadThreadsNet(readStatement, supplier);
+        }
+        return null;
+    }
+}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java
new file mode 100644
index 0000000..b36cfd1
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench.instance;
+
+
+import org.openjdk.jmh.annotations.Benchmark;
+
+public class ReadTestSmallPartitions extends ReadTest
+{
+    String readStatement()
+    {
+        return "SELECT * from "+table+" where userid=?";
+    }
+
+    @Override
+    public Object[] writeArguments(long i)
+    {
+        return new Object[] { i, i, i };
+    }
+
+    @Benchmark
+    public Object readRandomInside() throws Throwable
+    {
+        return performRead(readStatement(), () -> new Object[] { (long) rand.nextInt(count) });
+    }
+
+    @Benchmark
+    public Object readRandomWOutside() throws Throwable
+    {
+        return performRead(readStatement(), () -> new Object[] { (long) rand.nextInt(count + count / 6) });
+    }
+
+    @Benchmark
+    public Object readFixed() throws Throwable
+    {
+        return performRead(readStatement(), () -> new Object[] { 1234567890123L % count });
+    }
+
+    @Benchmark
+    public Object readOutside() throws Throwable
+    {
+        return performRead(readStatement(), () -> new Object[] { count + 1234567L });
+    }
+}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java
new file mode 100644
index 0000000..c36e09f
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench.instance;
+
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+
+public class ReadTestWidePartitions extends ReadTest
+{
+    @Param({"1000", "4"}) // wide and very wide partitions
+    int partitions = 4;
+
+    @Override
+    public Object[] writeArguments(long i)
+    {
+        return new Object[] { i % partitions, i, i };
+    }
+
+    Object[] readArguments(long i, long offset)
+    {
+        return new Object[] { (i + offset) % partitions, i };
+    }
+
+    @Benchmark
+    public Object readRandomInside() throws Throwable
+    {
+        return performRead("SELECT * from " + table + " where userid=? and picid=?",
+                           () -> readArguments(rand.nextInt(count),0));
+    }
+
+    @Benchmark
+    public Object readRandomWOutside() throws Throwable
+    {
+        return performRead("SELECT * from " + table + " where userid=? and picid=?",
+                           () -> readArguments(rand.nextInt(count), rand.nextInt(6) == 1 ? 1 : 0));
+    }
+
+    @Benchmark
+    public Object readFixed() throws Throwable
+    {
+        return performRead("SELECT * from " + table + " where userid=? and picid=?",
+                           () -> readArguments(1234567890123L % count, 0));
+    }
+
+    @Benchmark
+    public Object readOutside() throws Throwable
+    {
+        return performRead("SELECT * from " + table + " where userid=? and picid=?",
+                           () -> readArguments(1234567890123L % count, 1));
+    }
+
+    @Benchmark
+    public Object readGreaterMatch() throws Throwable
+    {
+        return performRead("SELECT * from "+table+" where userid=? and picid>? limit 1",
+                           () -> readArguments(rand.nextInt(count), 0));
+    }
+
+    @Benchmark
+    public Object readReversedMatch() throws Throwable
+    {
+        return performRead("SELECT * from "+table+" where userid=? and picid<? order by picid desc limit 1",
+                           () -> readArguments(rand.nextInt(count), 0));
+    }
+
+    @Benchmark
+    public Object readGreater() throws Throwable
+    {
+        return performRead("SELECT * from "+table+" where userid=? and picid>? limit 1",
+                           () -> readArguments(rand.nextInt(count), 1));
+    }
+
+    @Benchmark
+    public Object readReversed() throws Throwable
+    {
+        return performRead("SELECT * from "+table+" where userid=? and picid<? order by picid desc limit 1",
+                           () -> readArguments(rand.nextInt(count), -1));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java
index 5794dcb..221a23a 100644
--- a/test/unit/org/apache/cassandra/ServerTestUtils.java
+++ b/test/unit/org/apache/cassandra/ServerTestUtils.java
@@ -149,7 +149,10 @@ public final class ServerTestUtils
     {
         // clean up commitlog
         cleanupDirectory(DatabaseDescriptor.getCommitLogLocation());
-        cleanupDirectory(DatabaseDescriptor.getCDCLogLocation());
+
+        String cdcDir = DatabaseDescriptor.getCDCLogLocation();
+        if (cdcDir != null)
+            cleanupDirectory(cdcDir);
         cleanupDirectory(DatabaseDescriptor.getHintsDirectory());
         cleanupSavedCaches();
 
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 625fbe1..beff1e4 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -632,9 +632,9 @@ public abstract class CQLTester
         this.usePrepared = USE_PREPARED_VALUES;
     }
 
-    protected void disablePreparedReuseForTest()
+    public static void disablePreparedReuseForTest()
     {
-        this.reusePrepared = false;
+        reusePrepared = false;
     }
 
     protected String createType(String query)
diff --git a/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java b/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java
new file mode 100644
index 0000000..dbe0498
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class MemtableSizeTest extends CQLTester
+{
+    static String keyspace;
+    String table;
+    ColumnFamilyStore cfs;
+
+    int partitions = 50_000;
+    int rowsPerPartition = 4;
+
+    int deletedPartitions = 10_000;
+    int deletedRows = 5_000;
+
+    // must be within 50 bytes per partition of the actual size
+    final int MAX_DIFFERENCE = (partitions + deletedPartitions + deletedRows) * 50;
+
+    @BeforeClass
+    public static void setUp()
+    {
+        CQLTester.setUpClass();
+        CQLTester.prepareServer();
+        CQLTester.disablePreparedReuseForTest();
+        System.err.println("setupClass done.");
+    }
+
+    @Test
+    public void testSize() throws Throwable
+    {
+        keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
+        table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid)) with compression = {'enabled': false}");
+        execute("use " + keyspace + ';');
+
+        String writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)";
+
+        cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+        cfs.disableAutoCompaction();
+        cfs.forceBlockingFlush();
+
+        long deepSizeBefore = ObjectSizes.measureDeep(cfs.getTracker().getView().getCurrentMemtable());
+        System.out.printf("Memtable deep size before %s\n%n",
+                          FBUtilities.prettyPrintMemory(deepSizeBefore));
+        long i;
+        long limit = partitions;
+        System.out.println("Writing " + partitions + " partitions of " + rowsPerPartition + " rows");
+        for (i = 0; i < limit; ++i)
+        {
+            for (long j = 0; j < rowsPerPartition; ++j)
+                execute(writeStatement, i, j, i + j);
+        }
+
+        System.out.println("Deleting " + deletedPartitions + " partitions");
+        limit += deletedPartitions;
+        for (; i < limit; ++i)
+        {
+            // no partition exists, but we will create a tombstone
+            execute("DELETE FROM " + table + " WHERE userid = ?", i);
+        }
+
+        System.out.println("Deleting " + deletedRows + " rows");
+        limit += deletedRows;
+        for (; i < limit; ++i)
+        {
+            // no row exists, but we will create a tombstone (and partition)
+            execute("DELETE FROM " + table + " WHERE userid = ? AND picid = ?", i, 0L);
+        }
+
+
+        if (!cfs.getLiveSSTables().isEmpty())
+            System.out.println("Warning: " + cfs.getLiveSSTables().size() + " sstables created.");
+
+        Memtable memtable = cfs.getTracker().getView().getCurrentMemtable();
+        long actualHeap = memtable.getAllocator().onHeap().owns();
+        System.out.printf("Memtable in %s mode: %d ops, %s serialized bytes, %s (%.0f%%) on heap, %s (%.0f%%) off-heap%n",
+                          DatabaseDescriptor.getMemtableAllocationType(),
+                          memtable.getOperations(),
+                          FBUtilities.prettyPrintMemory(memtable.getLiveDataSize()),
+                          FBUtilities.prettyPrintMemory(actualHeap),
+                                         100 * memtable.getAllocator().onHeap().ownershipRatio(),
+                          FBUtilities.prettyPrintMemory(memtable.getAllocator().offHeap().owns()),
+                                         100 * memtable.getAllocator().offHeap().ownershipRatio());
+
+        long deepSizeAfter = ObjectSizes.measureDeep(memtable);
+        System.out.printf("Memtable deep size %s\n%n",
+                          FBUtilities.prettyPrintMemory(deepSizeAfter));
+
+        long expectedHeap = deepSizeAfter - deepSizeBefore;
+        String message = String.format("Expected heap usage close to %s, got %s.\n",
+                                       FBUtilities.prettyPrintMemory(expectedHeap),
+                                       FBUtilities.prettyPrintMemory(actualHeap));
+        System.out.println(message);
+        Assert.assertTrue(message, Math.abs(actualHeap - expectedHeap) <= MAX_DIFFERENCE);
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org