You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ed...@apache.org on 2021/01/19 19:16:48 UTC
[cassandra] branch trunk updated: Correct memtable on-heap size
calculations to match actual use (CASSANDRA-16318) Patch by Ekaterina
Dimitrova and Branimir Lambov;
reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318
This is an automated email from the ASF dual-hosted git repository.
edimitrova pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 661f1aa Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318
661f1aa is described below
commit 661f1aab171dc3ef16075f69581e88ad4a133fae
Author: Branimir Lambov <br...@datastax.com>
AuthorDate: Tue Dec 8 15:37:39 2020 +0200
Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
Patch by Ekaterina Dimitrova and Branimir Lambov; reviewed by Benjamin Lerer and Branimir Lambov for CASSANDRA-16318
---
CHANGES.txt | 3 +-
src/java/org/apache/cassandra/db/Columns.java | 10 +
src/java/org/apache/cassandra/db/DeletionTime.java | 3 +
src/java/org/apache/cassandra/db/LivenessInfo.java | 16 +-
src/java/org/apache/cassandra/db/Memtable.java | 33 ++-
.../apache/cassandra/db/MutableDeletionInfo.java | 3 +
.../cassandra/db/RegularAndStaticColumns.java | 12 +-
.../db/partitions/AbstractBTreePartition.java | 2 +
.../db/partitions/AtomicBTreePartition.java | 9 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 6 +-
.../apache/cassandra/db/rows/EncodingStats.java | 15 +-
src/java/org/apache/cassandra/db/rows/Row.java | 12 +-
test/bin/jmh | 137 ++++++++++++
test/conf/logback-jmh.xml | 101 +++++++++
.../test/microbench/instance/ReadTest.java | 233 +++++++++++++++++++++
.../instance/ReadTestSmallPartitions.java | 60 ++++++
.../instance/ReadTestWidePartitions.java | 96 +++++++++
.../unit/org/apache/cassandra/ServerTestUtils.java | 5 +-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 4 +-
.../apache/cassandra/cql3/MemtableSizeTest.java | 124 +++++++++++
20 files changed, 858 insertions(+), 26 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 09b02d5..9a304b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,5 @@
4.0-beta5
- * Fix client notifications in CQL protocol v5 (CASSANDRA-16353)
- * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
+ * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318)
* Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
* Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
* SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index aceb868..9e07375 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSearchIterator;
@@ -52,6 +53,7 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle
{
public static final Serializer serializer = new Serializer();
public static final Columns NONE = new Columns(BTree.empty(), 0);
+ static final long EMPTY_SIZE = ObjectSizes.measure(NONE);
public static final ColumnMetadata FIRST_COMPLEX_STATIC =
new ColumnMetadata("",
@@ -406,6 +408,14 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle
return Objects.hash(complexIdx, BTree.hashCode(columns));
}
+ public long unsharedHeapSize()
+ {
+ if(this == NONE)
+ return 0;
+
+ return EMPTY_SIZE;
+ }
+
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index f1471fd..d8ac91d 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -157,6 +157,9 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
public long unsharedHeapSize()
{
+ if (this == LIVE)
+ return 0;
+
return EMPTY_SIZE;
}
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java
index b1ea3f6..f3e6daa 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -19,8 +19,10 @@ package org.apache.cassandra.db;
import java.util.Objects;
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ObjectSizes;
/**
* Stores the information relating to the liveness of the primary key columns of a row.
@@ -35,7 +37,7 @@ import org.apache.cassandra.serializers.MarshalException;
* unaffected (of course, the rest of said row data might be ttl'ed on its own but this is
* separate).
*/
-public class LivenessInfo
+public class LivenessInfo implements IMeasurableMemory
{
public static final long NO_TIMESTAMP = Long.MIN_VALUE;
public static final int NO_TTL = Cell.NO_TTL;
@@ -49,6 +51,7 @@ public class LivenessInfo
public static final int NO_EXPIRATION_TIME = Cell.NO_DELETION_TIME;
public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
+ private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);
protected final long timestamp;
@@ -255,6 +258,11 @@ public class LivenessInfo
return Objects.hash(timestamp(), ttl(), localExpirationTime());
}
+ public long unsharedHeapSize()
+ {
+ return this == EMPTY ? 0 : UNSHARED_HEAP_SIZE;
+ }
+
/**
* Effectively acts as a PK tombstone. This is used for Materialized Views to shadow
* updated entries while co-existing with row tombstones.
@@ -294,6 +302,7 @@ public class LivenessInfo
{
private final int ttl;
private final int localExpirationTime;
+ private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(new ExpiringLivenessInfo(-1, -1, -1));
private ExpiringLivenessInfo(long timestamp, int ttl, int localExpirationTime)
{
@@ -364,5 +373,10 @@ public class LivenessInfo
{
return String.format("[ts=%d ttl=%d, let=%d]", timestamp, ttl, localExpirationTime);
}
+
+ public long unsharedHeapSize()
+ {
+ return UNSHARED_HEAP_SIZE;
+ }
}
}
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index c969616..cdbe163 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -17,38 +17,52 @@
*/
package org.apache.cassandra.db;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.AbstractBTreePartition;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.AtomicBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -529,6 +543,7 @@ public class Memtable implements Comparable<Memtable>
rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
rowOverhead -= ObjectSizes.measureDeep(new LongToken(0));
rowOverhead += AtomicBTreePartition.EMPTY_SIZE;
+ rowOverhead += AbstractBTreePartition.HOLDER_UNSHARED_HEAP_SIZE;
allocator.setDiscarding();
allocator.setDiscarded();
return rowOverhead;
diff --git a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
index 8544b78..bfe4d4c 100644
--- a/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/MutableDeletionInfo.java
@@ -248,6 +248,9 @@ public class MutableDeletionInfo implements DeletionInfo
@Override
public long unsharedHeapSize()
{
+ if (this == LIVE)
+ return 0;
+
return EMPTY_SIZE + partitionDeletion.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
}
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
index fab7730..1501345 100644
--- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -22,6 +22,7 @@ import java.util.*;
import com.google.common.collect.Iterators;
import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTreeSet;
import static java.util.Comparator.naturalOrder;
@@ -33,6 +34,7 @@ import static java.util.Comparator.naturalOrder;
public class RegularAndStaticColumns implements Iterable<ColumnMetadata>
{
public static RegularAndStaticColumns NONE = new RegularAndStaticColumns(Columns.NONE, Columns.NONE);
+ static final long EMPTY_SIZE = ObjectSizes.measure(NONE);
public final Columns statics;
public final Columns regulars;
@@ -105,11 +107,19 @@ public class RegularAndStaticColumns implements Iterable<ColumnMetadata>
return regulars.size() + statics.size();
}
+ public long unsharedHeapSize()
+ {
+ if(this == NONE)
+ return 0;
+
+ return EMPTY_SIZE + regulars.unsharedHeapSize() + statics.unsharedHeapSize();
+ }
+
@Override
public String toString()
{
StringBuilder sb = new StringBuilder();
- sb.append("[").append(statics).append(" | ").append(regulars).append("]");
+ sb.append('[').append(statics).append(" | ").append(regulars).append(']');
return sb.toString();
}
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index 44dc0b0..1d6603e 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
@@ -36,6 +37,7 @@ import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
{
protected static final Holder EMPTY = new Holder(RegularAndStaticColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+ public static final long HOLDER_UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);
protected final DecoratedKey partitionKey;
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index ed635cd..801d9e2 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -135,12 +135,14 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
}
RegularAndStaticColumns columns = update.columns().mergeTo(current.columns);
+ updater.allocated(columns.unsharedHeapSize() - current.columns.unsharedHeapSize());
Row newStatic = update.staticRow();
Row staticRow = newStatic.isEmpty()
? current.staticRow
: (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
EncodingStats newStats = current.stats.mergeWith(update.stats());
+ updater.allocated(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize());
if (tree != null && refUpdater.compareAndSet(this, current, new Holder(columns, tree, deletionInfo, staticRow, newStats)))
{
@@ -274,8 +276,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
if (!writeOp.isOldestLiveGroup())
{
Thread.yield();
- if (!writeOp.isOldestLiveGroup())
- return false;
+ return writeOp.isOldestLiveGroup();
}
return true;
@@ -370,7 +371,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
indexer.onInserted(insert);
this.dataSize += data.dataSize();
- this.heapSize += data.unsharedHeapSizeExcludingData();
+ allocated(data.unsharedHeapSizeExcludingData());
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(data);
@@ -387,7 +388,7 @@ public final class AtomicBTreePartition extends AbstractBTreePartition
indexer.onUpdated(existing, reconciled);
dataSize += reconciled.dataSize() - existing.dataSize();
- heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
+ allocated(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData());
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(reconciled);
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index e8476dd..b971307 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -494,8 +494,10 @@ public class BTreeRow extends AbstractRow
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE
- + clustering.unsharedHeapSizeExcludingData()
- + BTree.sizeOfStructureOnHeap(btree);
+ + clustering.unsharedHeapSizeExcludingData()
+ + primaryKeyLivenessInfo.unsharedHeapSize()
+ + deletion.unsharedHeapSize()
+ + BTree.sizeOfStructureOnHeap(btree);
return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), heapSize);
}
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index f2fb340..37dd34e 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -21,12 +21,12 @@ import java.io.IOException;
import java.util.*;
import java.util.function.Function;
-import com.google.common.collect.Iterables;
-
+import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ObjectSizes;
/**
* Stats used for the encoding of the rows and tombstones of a given source.
@@ -40,13 +40,14 @@ import org.apache.cassandra.io.util.DataOutputPlus;
* this shouldn't have too huge an impact on performance) and in fact they will not always be
* accurate for reasons explained in {@link SerializationHeader#make}.
*/
-public class EncodingStats
+public class EncodingStats implements IMeasurableMemory
{
// Default values for the timestamp, deletion time and ttl. We use this both for NO_STATS, but also to serialize
// an EncodingStats. Basically, we encode the diff of each value of to these epoch, which give values with better vint encoding.
public static final long TIMESTAMP_EPOCH;
private static final int DELETION_TIME_EPOCH;
private static final int TTL_EPOCH = 0;
+
static
{
// We want a fixed epoch, but that provide small values when substracted from our timestamp and deletion time.
@@ -66,6 +67,7 @@ public class EncodingStats
// We should use this sparingly obviously
public static final EncodingStats NO_STATS = new EncodingStats(TIMESTAMP_EPOCH, DELETION_TIME_EPOCH, TTL_EPOCH);
+ public static long HEAP_SIZE = ObjectSizes.measure(NO_STATS);
public static final Serializer serializer = new Serializer();
@@ -152,6 +154,13 @@ public class EncodingStats
return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL);
}
+ public long unsharedHeapSize()
+ {
+ if (this == NO_STATS)
+ return 0;
+ return HEAP_SIZE;
+ }
+
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index ec43783..5c28cd1 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.BiLongAccumulator;
import org.apache.cassandra.utils.LongAccumulator;
import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -323,6 +324,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
public static class Deletion
{
public static final Deletion LIVE = new Deletion(DeletionTime.LIVE, false);
+ private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
private final DeletionTime time;
private final boolean isShadowable;
@@ -422,6 +424,14 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
return this.time.equals(that.time) && this.isShadowable == that.isShadowable;
}
+ public long unsharedHeapSize()
+ {
+ if(this == LIVE)
+ return 0;
+
+ return EMPTY_SIZE + time().unsharedHeapSize();
+ }
+
@Override
public final int hashCode()
{
@@ -722,7 +732,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
? null
- : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp()));
+ : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.noOp()));
}
public Clustering<?> mergedClustering()
diff --git a/test/bin/jmh b/test/bin/jmh
new file mode 100755
index 0000000..c56145f
--- /dev/null
+++ b/test/bin/jmh
@@ -0,0 +1,137 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+jvmoptions_variant="-server"
+export CASSANDRA_HOME=`dirname "$0"`/../../
+. $CASSANDRA_HOME/bin/cassandra.in.sh
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -n "$JAVA_HOME" ]; then
+ # Why we can't have nice things: Solaris combines x86 and x86_64
+ # installations in the same tree, using an unconventional path for the
+ # 64bit JVM. Since we prefer 64bit, search the alternate path first,
+ # (see https://issues.apache.org/jira/browse/CASSANDRA-4638).
+ for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
+ if [ -x "$java" ]; then
+ JAVA="$java"
+ break
+ fi
+ done
+else
+ JAVA=java
+fi
+
+if [ -z $JAVA ] ; then
+ echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. >&2
+ exit 1;
+fi
+
+# If numactl is available, use it. For Cassandra, the priority is to
+# avoid disk I/O. Even for the purpose of CPU efficiency, we don't
+# really have CPU<->data affinity anyway. Also, empirically test that numactl
+# works before trying to use it (CASSANDRA-3245).
+NUMACTL_ARGS=${NUMACTL_ARGS:-"--localalloc"}
+if which numactl >/dev/null 2>/dev/null && numactl $NUMACTL_ARGS ls / >/dev/null 2>/dev/null
+then
+ NUMACTL="numactl $NUMACTL_ARGS"
+else
+ NUMACTL=""
+fi
+
+if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then
+ echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2
+ exit 1
+fi
+
+if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then
+ . "$CASSANDRA_CONF/cassandra-env.sh"
+fi
+
+# Special-case path variables.
+case "`uname`" in
+ CYGWIN*)
+ CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+ CASSANDRA_CONF=`cygpath -p -w "$CASSANDRA_CONF"`
+ ;;
+esac
+
+# Cassandra uses an installed jemalloc via LD_PRELOAD / DYLD_INSERT_LIBRARIES by default to improve off-heap
+# memory allocation performance. The following code searches for an installed libjemalloc.dylib/.so/.1.so using
+# Linux and OS-X specific approaches.
+# To specify your own libjemalloc in a different path, configure the fully qualified path in CASSANDRA_LIBJEMALLOC.
+# To disable jemalloc preload at all, set CASSANDRA_LIBJEMALLOC=-
+#
+#CASSANDRA_LIBJEMALLOC=
+#
+find_library()
+{
+ pattern=$1
+ path=$(echo ${2} | tr ":" " ")
+
+ find $path -regex "$pattern" -print 2>/dev/null | head -n 1
+}
+case "`uname -s`" in
+ Linux)
+ if [ -z $CASSANDRA_LIBJEMALLOC ] ; then
+ which ldconfig > /dev/null 2>&1
+ if [ $? = 0 ] ; then
+ # e.g. for CentOS
+ dirs="/lib64 /lib /usr/lib64 /usr/lib `ldconfig -v 2>/dev/null | grep -v '^\s' | sed 's/^\([^:]*\):.*$/\1/'`"
+ else
+ # e.g. for Debian, OpenSUSE
+ dirs="/lib64 /lib /usr/lib64 /usr/lib `cat /etc/ld.so.conf /etc/ld.so.conf.d/*.conf | grep '^/'`"
+ fi
+ dirs=`echo $dirs | tr " " ":"`
+ CASSANDRA_LIBJEMALLOC=$(find_library '.*/libjemalloc\.so\(\.1\)*' $dirs)
+ fi
+ if [ ! -z $CASSANDRA_LIBJEMALLOC ] ; then
+ export JVM_OPTS="$JVM_OPTS -Dcassandra.libjemalloc=$CASSANDRA_LIBJEMALLOC"
+ if [ "-" != "$CASSANDRA_LIBJEMALLOC" ] ; then
+ export LD_PRELOAD=$CASSANDRA_LIBJEMALLOC
+ fi
+ fi
+ ;;
+ Darwin)
+ if [ -z $CASSANDRA_LIBJEMALLOC ] ; then
+ CASSANDRA_LIBJEMALLOC=$(find_library '.*/libjemalloc\.dylib' $DYLD_LIBRARY_PATH:${DYLD_FALLBACK_LIBRARY_PATH-$HOME/lib:/usr/local/lib:/lib:/usr/lib})
+ fi
+ if [ ! -z $CASSANDRA_LIBJEMALLOC ] ; then
+ export JVM_OPTS="$JVM_OPTS -Dcassandra.libjemalloc=$CASSANDRA_LIBJEMALLOC"
+ if [ "-" != "$CASSANDRA_LIBJEMALLOC" ] ; then
+ export DYLD_INSERT_LIBRARIES=$CASSANDRA_LIBJEMALLOC
+ fi
+ fi
+ ;;
+esac
+
+cassandra_parms="-Dlogback.configurationFile=$CASSANDRA_HOME/test/conf/logback-jmh.xml"
+cassandra_parms="$cassandra_parms -Dcassandra.logdir=$CASSANDRA_HOME/logs"
+cassandra_parms="$cassandra_parms -Dcassandra.storagedir=$cassandra_storagedir"
+cassandra_parms="$cassandra_parms -Dcassandra-foreground=yes"
+cassandra_parms="$cassandra_parms -XX:+PreserveFramePointer"
+
+# Create log directory, some tests require that
+mkdir -p $CASSANDRA_HOME/logs
+
+if [ ! -f $CASSANDRA_HOME/build/test/benchmarks.jar ] ; then
+ echo "$CASSANDRA_HOME/build/test/benchmarks.jar does not exist - execute 'ant build-jmh'"
+ exit 1
+fi
+
+exec $NUMACTL "$JAVA" -cp "$CLASSPATH:$CASSANDRA_HOME/build/test/benchmarks.jar:$CASSANDRA_HOME/build/test/deps.jar" org.openjdk.jmh.Main -jvmArgs="$cassandra_parms $JVM_OPTS" $@
+
+# vi:ai sw=4 ts=4 tw=0 et
diff --git a/test/conf/logback-jmh.xml b/test/conf/logback-jmh.xml
new file mode 100644
index 0000000..4138f19
--- /dev/null
+++ b/test/conf/logback-jmh.xml
@@ -0,0 +1,101 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!--
+In order to disable debug.log, comment-out the ASYNCDEBUGLOG
+appender reference in the root level section below.
+-->
+
+<configuration scan="false" scanPeriod="60 seconds">
+ <jmxConfigurator />
+ <!-- No shutdown hook; we run it ourselves in StorageService after shutdown -->
+
+ <!-- SYSTEMLOG rolling file appender to system.log (INFO level) -->
+
+ <appender name="SYSTEMLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ <file>${cassandra.logdir}/system.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <!-- rollover daily -->
+ <fileNamePattern>${cassandra.logdir}/system.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+ <!-- each file should be at most 50MB, keep 7 days worth of history, but at most 5GB -->
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>7</maxHistory>
+ <totalSizeCap>5GB</totalSizeCap>
+ </rollingPolicy>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- DEBUGLOG rolling file appender to debug.log (all levels) -->
+
+ <appender name="DEBUGLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${cassandra.logdir}/debug.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <!-- rollover daily -->
+ <fileNamePattern>${cassandra.logdir}/debug.log.%d{yyyy-MM-dd}.%i.zip</fileNamePattern>
+ <!-- each file should be at most 50MB, keep 7 days worth of history, but at most 5GB -->
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>7</maxHistory>
+ <totalSizeCap>5GB</totalSizeCap>
+ </rollingPolicy>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <!-- ASYNCLOG assynchronous appender to debug.log (all levels) -->
+ <appender name="ASYNCDEBUGLOG" class="ch.qos.logback.classic.AsyncAppender">
+ <queueSize>1024</queueSize>
+ <discardingThreshold>0</discardingThreshold>
+ <includeCallerData>true</includeCallerData>
+ <appender-ref ref="DEBUGLOG" />
+ </appender>
+
+ <!-- NOTE: For JMH we disable the STDOUT logging to avoid noise in output -->
+ <!-- STDOUT console appender to stdout (INFO level)
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ <encoder>
+ <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+ </encoder>
+ </appender-->
+
+ <!-- Uncomment bellow and corresponding appender-ref to activate logback metrics
+ <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
+ -->
+
+ <root level="INFO">
+ <appender-ref ref="SYSTEMLOG" />
+ <!-- appender-ref ref="STDOUT" /-->
+ <appender-ref ref="ASYNCDEBUGLOG" /> <!-- Comment this line to disable debug.log -->
+ <!--
+ <appender-ref ref="LogbackMetrics" />
+ -->
+ </root>
+
+ <logger name="org.apache.cassandra" level="DEBUG"/>
+ <logger name="com.datastax.bdp.db" level="DEBUG"/>
+</configuration>
diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java
new file mode 100644
index 0000000..789ca00
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench.instance;
+
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.FBUtilities;
+import org.openjdk.jmh.annotations.*;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Warmup(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 15, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1)
+@Threads(1)
+@State(Scope.Benchmark)
+public abstract class ReadTest extends CQLTester
+{
+ static String keyspace;
+ String table;
+ ColumnFamilyStore cfs;
+ Random rand;
+
+ @Param({"1000"})
+ int BATCH = 1_000;
+
+ public enum Flush
+ {
+ INMEM, NO, YES
+ }
+
+ @Param({"1000000"})
+ int count = 1_000_000;
+
+ @Param({"INMEM", "YES"})
+ Flush flush = Flush.INMEM;
+
+ public enum Execution
+ {
+ SERIAL,
+ SERIAL_NET,
+ PARALLEL,
+ PARALLEL_NET,
+ }
+
+ @Param({"PARALLEL"})
+ Execution async = Execution.PARALLEL;
+
+ @Setup(Level.Trial)
+ public void setup() throws Throwable
+ {
+ rand = new Random(1);
+ CQLTester.setUpClass();
+ CQLTester.prepareServer();
+ System.err.println("setupClass done.");
+ keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
+ table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid)) with compression = {'enabled': false}");
+ execute("use "+keyspace+";");
+ switch (async)
+ {
+ case SERIAL_NET:
+ case PARALLEL_NET:
+ CQLTester.requireNetwork();
+ executeNet(getDefaultVersion(), "use " + keyspace + ";");
+ }
+ String writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)";
+ System.err.println("Prepared, batch " + BATCH + " flush " + flush);
+ System.err.println("Disk access mode " + DatabaseDescriptor.getDiskAccessMode() + " index " + DatabaseDescriptor.getIndexAccessMode());
+
+ cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+ cfs.forceBlockingFlush();
+
+ //Warm up
+ System.err.println("Writing " + count);
+ long i;
+ for (i = 0; i <= count - BATCH; i += BATCH)
+ performWrite(writeStatement, i, BATCH);
+ if (i < count)
+ performWrite(writeStatement, i, count - i);
+
+ Memtable memtable = cfs.getTracker().getView().getCurrentMemtable();
+ System.err.format("Memtable in %s mode: %d ops, %s serialized bytes, %s (%.0f%%) on heap, %s (%.0f%%) off-heap\n",
+ DatabaseDescriptor.getMemtableAllocationType(),
+ memtable.getOperations(),
+ FBUtilities.prettyPrintMemory(memtable.getLiveDataSize()),
+ FBUtilities.prettyPrintMemory(memtable.getAllocator().onHeap().owns()),
+ 100 * memtable.getAllocator().onHeap().ownershipRatio(),
+ FBUtilities.prettyPrintMemory(memtable.getAllocator().offHeap().owns()),
+ 100 * memtable.getAllocator().offHeap().ownershipRatio());
+
+ switch (flush)
+ {
+ case YES:
+ cfs.forceBlockingFlush();
+ break;
+ case INMEM:
+ if (!cfs.getLiveSSTables().isEmpty())
+ throw new AssertionError("SSTables created for INMEM test.");
+ default:
+ // don't flush
+ }
+
+ // Needed to stabilize sstable count for off-cache sized tests (e.g. count = 100_000_000)
+ while (cfs.getLiveSSTables().size() >= 15)
+ {
+ cfs.enableAutoCompaction(true);
+ cfs.disableAutoCompaction();
+ }
+ }
+
+ abstract Object[] writeArguments(long i);
+
+ public void performWrite(String writeStatement, long ofs, long count) throws Throwable
+ {
+ for (long i = ofs; i < ofs + count; ++i)
+ execute(writeStatement, writeArguments(i));
+ }
+
+
+ @TearDown(Level.Trial)
+ public void teardown() throws InterruptedException
+ {
+ if (flush == Flush.INMEM && !cfs.getLiveSSTables().isEmpty())
+ throw new AssertionError("SSTables created for INMEM test.");
+
+ // do a flush to print sizes
+ cfs.forceBlockingFlush();
+
+ CommitLog.instance.shutdownBlocking();
+ CQLTester.tearDownClass();
+ CQLTester.cleanup();
+ }
+
+ public Object performReadSerial(String readStatement, Supplier<Object[]> supplier) throws Throwable
+ {
+ long sum = 0;
+ for (int i = 0; i < BATCH; ++i)
+ sum += execute(readStatement, supplier.get()).size();
+ return sum;
+ }
+
+ public Object performReadThreads(String readStatement, Supplier<Object[]> supplier) throws Throwable
+ {
+ return IntStream.range(0, BATCH)
+ .parallel()
+ .mapToLong(i ->
+ {
+ try
+ {
+ return execute(readStatement, supplier.get()).size();
+ }
+ catch (Throwable throwable)
+ {
+ throw Throwables.propagate(throwable);
+ }
+ })
+ .sum();
+ }
+
+ public Object performReadSerialNet(String readStatement, Supplier<Object[]> supplier) throws Throwable
+ {
+ long sum = 0;
+ for (int i = 0; i < BATCH; ++i)
+ sum += executeNet(getDefaultVersion(), readStatement, supplier.get())
+ .getAvailableWithoutFetching();
+ return sum;
+ }
+
+ public long performReadThreadsNet(String readStatement, Supplier<Object[]> supplier) throws Throwable
+ {
+ return IntStream.range(0, BATCH)
+ .parallel()
+ .mapToLong(i ->
+ {
+ try
+ {
+ return executeNet(getDefaultVersion(), readStatement, supplier.get())
+ .getAvailableWithoutFetching();
+ }
+ catch (Throwable throwable)
+ {
+ throw Throwables.propagate(throwable);
+ }
+ })
+ .sum();
+ }
+
+
+ public Object performRead(String readStatement, Supplier<Object[]> supplier) throws Throwable
+ {
+ switch (async)
+ {
+ case SERIAL:
+ return performReadSerial(readStatement, supplier);
+ case SERIAL_NET:
+ return performReadSerialNet(readStatement, supplier);
+ case PARALLEL:
+ return performReadThreads(readStatement, supplier);
+ case PARALLEL_NET:
+ return performReadThreadsNet(readStatement, supplier);
+ }
+ return null;
+ }
+}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java
new file mode 100644
index 0000000..b36cfd1
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestSmallPartitions.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench.instance;
+
+
+import org.openjdk.jmh.annotations.Benchmark;
+
+public class ReadTestSmallPartitions extends ReadTest
+{
+ String readStatement()
+ {
+ return "SELECT * from "+table+" where userid=?";
+ }
+
+ @Override
+ public Object[] writeArguments(long i)
+ {
+ return new Object[] { i, i, i };
+ }
+
+ @Benchmark
+ public Object readRandomInside() throws Throwable
+ {
+ return performRead(readStatement(), () -> new Object[] { (long) rand.nextInt(count) });
+ }
+
+ @Benchmark
+ public Object readRandomWOutside() throws Throwable
+ {
+ return performRead(readStatement(), () -> new Object[] { (long) rand.nextInt(count + count / 6) });
+ }
+
+ @Benchmark
+ public Object readFixed() throws Throwable
+ {
+ return performRead(readStatement(), () -> new Object[] { 1234567890123L % count });
+ }
+
+ @Benchmark
+ public Object readOutside() throws Throwable
+ {
+ return performRead(readStatement(), () -> new Object[] { count + 1234567L });
+ }
+}
diff --git a/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java
new file mode 100644
index 0000000..c36e09f
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/instance/ReadTestWidePartitions.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench.instance;
+
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+
+public class ReadTestWidePartitions extends ReadTest
+{
+ @Param({"1000", "4"}) // wide and very wide partitions
+ int partitions = 4;
+
+ @Override
+ public Object[] writeArguments(long i)
+ {
+ return new Object[] { i % partitions, i, i };
+ }
+
+ Object[] readArguments(long i, long offset)
+ {
+ return new Object[] { (i + offset) % partitions, i };
+ }
+
+ @Benchmark
+ public Object readRandomInside() throws Throwable
+ {
+ return performRead("SELECT * from " + table + " where userid=? and picid=?",
+ () -> readArguments(rand.nextInt(count),0));
+ }
+
+ @Benchmark
+ public Object readRandomWOutside() throws Throwable
+ {
+ return performRead("SELECT * from " + table + " where userid=? and picid=?",
+ () -> readArguments(rand.nextInt(count), rand.nextInt(6) == 1 ? 1 : 0));
+ }
+
+ @Benchmark
+ public Object readFixed() throws Throwable
+ {
+ return performRead("SELECT * from " + table + " where userid=? and picid=?",
+ () -> readArguments(1234567890123L % count, 0));
+ }
+
+ @Benchmark
+ public Object readOutside() throws Throwable
+ {
+ return performRead("SELECT * from " + table + " where userid=? and picid=?",
+ () -> readArguments(1234567890123L % count, 1));
+ }
+
+ @Benchmark
+ public Object readGreaterMatch() throws Throwable
+ {
+ return performRead("SELECT * from "+table+" where userid=? and picid>? limit 1",
+ () -> readArguments(rand.nextInt(count), 0));
+ }
+
+ @Benchmark
+ public Object readReversedMatch() throws Throwable
+ {
+ return performRead("SELECT * from "+table+" where userid=? and picid<? order by picid desc limit 1",
+ () -> readArguments(rand.nextInt(count), 0));
+ }
+
+ @Benchmark
+ public Object readGreater() throws Throwable
+ {
+ return performRead("SELECT * from "+table+" where userid=? and picid>? limit 1",
+ () -> readArguments(rand.nextInt(count), 1));
+ }
+
+ @Benchmark
+ public Object readReversed() throws Throwable
+ {
+ return performRead("SELECT * from "+table+" where userid=? and picid<? order by picid desc limit 1",
+ () -> readArguments(rand.nextInt(count), -1));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java
index 5794dcb..221a23a 100644
--- a/test/unit/org/apache/cassandra/ServerTestUtils.java
+++ b/test/unit/org/apache/cassandra/ServerTestUtils.java
@@ -149,7 +149,10 @@ public final class ServerTestUtils
{
// clean up commitlog
cleanupDirectory(DatabaseDescriptor.getCommitLogLocation());
- cleanupDirectory(DatabaseDescriptor.getCDCLogLocation());
+
+ String cdcDir = DatabaseDescriptor.getCDCLogLocation();
+ if (cdcDir != null)
+ cleanupDirectory(cdcDir);
cleanupDirectory(DatabaseDescriptor.getHintsDirectory());
cleanupSavedCaches();
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 625fbe1..beff1e4 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -632,9 +632,9 @@ public abstract class CQLTester
this.usePrepared = USE_PREPARED_VALUES;
}
- protected void disablePreparedReuseForTest()
+ public static void disablePreparedReuseForTest()
{
- this.reusePrepared = false;
+ reusePrepared = false;
}
protected String createType(String query)
diff --git a/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java b/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java
new file mode 100644
index 0000000..dbe0498
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/MemtableSizeTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class MemtableSizeTest extends CQLTester
+{
+ static String keyspace;
+ String table;
+ ColumnFamilyStore cfs;
+
+ int partitions = 50_000;
+ int rowsPerPartition = 4;
+
+ int deletedPartitions = 10_000;
+ int deletedRows = 5_000;
+
+ // must be within 50 bytes per partition of the actual size
+ final int MAX_DIFFERENCE = (partitions + deletedPartitions + deletedRows) * 50;
+
+ @BeforeClass
+ public static void setUp()
+ {
+ CQLTester.setUpClass();
+ CQLTester.prepareServer();
+ CQLTester.disablePreparedReuseForTest();
+ System.err.println("setupClass done.");
+ }
+
+ @Test
+ public void testSize() throws Throwable
+ {
+ keyspace = createKeyspace("CREATE KEYSPACE %s with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } and durable_writes = false");
+ table = createTable(keyspace, "CREATE TABLE %s ( userid bigint, picid bigint, commentid bigint, PRIMARY KEY(userid, picid)) with compression = {'enabled': false}");
+ execute("use " + keyspace + ';');
+
+ String writeStatement = "INSERT INTO "+table+"(userid,picid,commentid)VALUES(?,?,?)";
+
+ cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+ cfs.forceBlockingFlush();
+
+ long deepSizeBefore = ObjectSizes.measureDeep(cfs.getTracker().getView().getCurrentMemtable());
+ System.out.printf("Memtable deep size before %s\n%n",
+ FBUtilities.prettyPrintMemory(deepSizeBefore));
+ long i;
+ long limit = partitions;
+ System.out.println("Writing " + partitions + " partitions of " + rowsPerPartition + " rows");
+ for (i = 0; i < limit; ++i)
+ {
+ for (long j = 0; j < rowsPerPartition; ++j)
+ execute(writeStatement, i, j, i + j);
+ }
+
+ System.out.println("Deleting " + deletedPartitions + " partitions");
+ limit += deletedPartitions;
+ for (; i < limit; ++i)
+ {
+ // no partition exists, but we will create a tombstone
+ execute("DELETE FROM " + table + " WHERE userid = ?", i);
+ }
+
+ System.out.println("Deleting " + deletedRows + " rows");
+ limit += deletedRows;
+ for (; i < limit; ++i)
+ {
+ // no row exists, but we will create a tombstone (and partition)
+ execute("DELETE FROM " + table + " WHERE userid = ? AND picid = ?", i, 0L);
+ }
+
+
+ if (!cfs.getLiveSSTables().isEmpty())
+ System.out.println("Warning: " + cfs.getLiveSSTables().size() + " sstables created.");
+
+ Memtable memtable = cfs.getTracker().getView().getCurrentMemtable();
+ long actualHeap = memtable.getAllocator().onHeap().owns();
+ System.out.printf("Memtable in %s mode: %d ops, %s serialized bytes, %s (%.0f%%) on heap, %s (%.0f%%) off-heap%n",
+ DatabaseDescriptor.getMemtableAllocationType(),
+ memtable.getOperations(),
+ FBUtilities.prettyPrintMemory(memtable.getLiveDataSize()),
+ FBUtilities.prettyPrintMemory(actualHeap),
+ 100 * memtable.getAllocator().onHeap().ownershipRatio(),
+ FBUtilities.prettyPrintMemory(memtable.getAllocator().offHeap().owns()),
+ 100 * memtable.getAllocator().offHeap().ownershipRatio());
+
+ long deepSizeAfter = ObjectSizes.measureDeep(memtable);
+ System.out.printf("Memtable deep size %s\n%n",
+ FBUtilities.prettyPrintMemory(deepSizeAfter));
+
+ long expectedHeap = deepSizeAfter - deepSizeBefore;
+ String message = String.format("Expected heap usage close to %s, got %s.\n",
+ FBUtilities.prettyPrintMemory(expectedHeap),
+ FBUtilities.prettyPrintMemory(actualHeap));
+ System.out.println(message);
+ Assert.assertTrue(message, Math.abs(actualHeap - expectedHeap) <= MAX_DIFFERENCE);
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org