You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/03/11 19:52:33 UTC
[cassandra] branch trunk updated: Add compaction allocation
measurement test
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9e4027 Add compaction allocation measurement test
c9e4027 is described below
commit c9e40277f0389a773b21d4310a85a8c761932d61
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Oct 15 13:04:26 2019 -0700
Add compaction allocation measurement test
Patch by Blake Eggleston, reviewed by Benedict Elliott Smith and David Capwell for CASSANDRA-15388
---
CHANGES.txt | 1 +
build.xml | 13 +
ide/idea-iml-file.xml | 1 +
.../db/compaction/CompactionAllocationTest.java | 767 +++++++++++++++++++++
4 files changed, 782 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index d863df2..c6a2eac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Add compaction allocation measurement test (CASSANDRA-15388)
* Added UnleveledSSTables global and table level metric (CASSANDRA-15620)
* Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616)
* Add data modeling introduction (CASSANDRA-15481)
diff --git a/build.xml b/build.xml
index 9c4f5b0..bd1f557 100644
--- a/build.xml
+++ b/build.xml
@@ -60,6 +60,7 @@
<property name="test.unit.src" value="${test.dir}/unit"/>
<property name="test.long.src" value="${test.dir}/long"/>
<property name="test.burn.src" value="${test.dir}/burn"/>
+ <property name="test.memory.src" value="${test.dir}/memory"/>
<property name="test.microbench.src" value="${test.dir}/microbench"/>
<property name="test.distributed.src" value="${test.dir}/distributed"/>
<property name="test.compression_algo" value="LZ4"/>
@@ -96,6 +97,7 @@
<property name="maven-repository-id" value="apache.snapshots.https"/>
<property name="test.timeout" value="240000" />
+ <property name="test.memory.timeout" value="480000" />
<property name="test.long.timeout" value="600000" />
<property name="test.burn.timeout" value="60000000" />
<property name="test.distributed.timeout" value="360000" />
@@ -118,6 +120,7 @@
<property name="ecj.version" value="4.6.1"/>
<property name="ohc.version" value="0.5.1"/>
<property name="asm.version" value="7.1"/>
+ <property name="allocation-instrumenter.version" value="3.1.0"/>
<!-- https://mvnrepository.com/artifact/net.openhft/chronicle-bom/1.16.23 -->
<property name="chronicle-queue.version" value="4.16.3" />
@@ -539,6 +542,7 @@
<dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
<dependency groupId="junit" artifactId="junit" version="4.12" />
<dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
+ <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
<dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
@@ -672,6 +676,7 @@
version="${version}"/>
<dependency groupId="junit" artifactId="junit"/>
<dependency groupId="org.quicktheories" artifactId="quicktheories" />
+ <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
<dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
<dependency groupId="org.apache.rat" artifactId="apache-rat"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
@@ -1324,6 +1329,7 @@
<src path="${test.unit.src}"/>
<src path="${test.long.src}"/>
<src path="${test.burn.src}"/>
+ <src path="${test.memory.src}"/>
<src path="${test.microbench.src}"/>
<src path="${test.distributed.src}"/>
</javac>
@@ -1620,6 +1626,13 @@
</testmacro>
</target>
+ <target name="test-memory" depends="build-test" description="Execute functional tests">
+ <testmacro inputdir="${test.memory.src}"
+ timeout="${test.memory.timeout}">
+ <jvmarg value="-javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar"/>
+ </testmacro>
+ </target>
+
<target name="cql-test" depends="build-test" description="Execute CQL tests">
<sequential>
<echo message="running CQL tests"/>
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index 17d4c5b..86827c3 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -32,6 +32,7 @@
<sourceFolder url="file://$MODULE_DIR$/tools/fqltool/test/unit" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/unit" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/test/memory" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" />
diff --git a/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java b/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java
new file mode 100644
index 0000000..a58303d
--- /dev/null
+++ b/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java
@@ -0,0 +1,767 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.monitoring.runtime.instrumentation.AllocationRecorder;
+import com.google.monitoring.runtime.instrumentation.Sampler;
+import com.sun.management.ThreadMXBean;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class CompactionAllocationTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(CompactionAllocationTest.class);
+ private static final ThreadMXBean threadMX = (ThreadMXBean) ManagementFactory.getThreadMXBean();
+
+ private static final boolean AGENT_MEASUREMENT = true;
+
+ private static final boolean PROFILING_READS = false;
+ private static final boolean PROFILING_COMPACTION = false;
+ private static final boolean PROFILING = PROFILING_READS || PROFILING_COMPACTION;
+ private static final List<String> summaries = new ArrayList<>();
+
+ private static class CompactionSummary
+ {
+ final Measurement measurement;
+ final int numPartitions;
+ final int numRows;
+
+ public CompactionSummary(Measurement measurement, int numPartitions, int numRows)
+ {
+ this.measurement = measurement;
+ this.numPartitions = numPartitions;
+ this.numRows = numRows;
+ }
+
+ List<String> cells()
+ {
+ long b = measurement.bytes();
+ return Lists.newArrayList(Long.toString(b), Long.toString(b/numPartitions), Long.toString(b/numRows));
+ }
+
+ static final List<String> HEADERS = Lists.newArrayList("bytes", "/p", "/r");
+ static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a", "n/a");
+ }
+
+ private static class ReadSummary
+ {
+ final Measurement measurement;
+ final int numReads;
+
+ public ReadSummary(Measurement measurement, int numReads)
+ {
+ this.measurement = measurement;
+ this.numReads = numReads;
+ }
+
+ List<String> cells()
+ {
+ long b = measurement.bytes();
+ return Lists.newArrayList(Long.toString(b), Long.toString(b/numReads));
+ }
+ static final List<String> HEADERS = Lists.newArrayList("bytes", "/rd");
+ static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a");
+ }
+
+ private static final Map<String, CompactionSummary> compactionSummaries = new HashMap<>();
+ private static final Map<String, ReadSummary> readSummaries = new HashMap<>();
+
+ /*
+ add to jvm args:
+ -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar
+ */
+
+ private static final long MIN_OBJECTS_ALLOCATED;
+ private static final long MIN_BYTES_ALLOCATED;
+
+ static
+ {
+ if (AGENT_MEASUREMENT)
+ {
+ AgentMeasurement measurement = new AgentMeasurement();
+ measurement.start();
+ measurement.stop();
+ MIN_OBJECTS_ALLOCATED = measurement.objectsAllocated;
+ MIN_BYTES_ALLOCATED = measurement.bytesAllocated;
+ }
+ else
+ {
+ MIN_OBJECTS_ALLOCATED = 0;
+ MIN_BYTES_ALLOCATED = 0;
+ logger.warn("{} is using the ThreadMXBean to measure memory usage, this is less accurate than the allocation instrumenter agent", CompactionAllocationTest.class.getSimpleName());
+ logger.warn("If you're running this in your IDE, add the following jvm arg: " +
+ "-javaagent:<build.dir>/lib/jars/java-allocation-instrumenter-<allocation-instrumenter.version>.jar " +
+ "(and replace <> with appropriate values from build.xml)");
+ }
+ }
+
+ @BeforeClass
+ public static void setupClass() throws Throwable
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.startGossiper();
+ testTinyPartitions("warmup", 9, maybeInflate(300), true);
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+
+ logger.info("SUMMARIES:");
+ for (String summary : summaries)
+ logger.info(summary);
+
+
+ List<List<String>> groups = new ArrayList<>();
+ groups.add(Lists.newArrayList("tinyNonOverlapping3",
+ "tinyNonOverlapping9",
+ "tinyOverlapping3",
+ "tinyOverlapping9"));
+ groups.add(Lists.newArrayList("mediumNonOverlappingPartitions3",
+ "mediumNonOverlappingPartitions9",
+ "mediumOverlappingPartitions3",
+ "mediumOverlappingPartitions9",
+ "mediumPartitionsOverlappingRows3",
+ "mediumPartitionsOverlappingRows9"));
+ groups.add(Lists.newArrayList("wideNonOverlappingPartitions3",
+ "wideNonOverlappingPartitions9",
+ "wideOverlappingPartitions3",
+ "wideOverlappingPartitions9",
+ "widePartitionsOverlappingRows9",
+ "widePartitionsOverlappingRows3"));
+
+ Map<String, List<String>> fullRows = new HashMap<>();
+ for (String workload : Iterables.concat(groups))
+ {
+ CompactionSummary cs = compactionSummaries.get(workload);
+ ReadSummary rs = readSummaries.get(workload);
+ fullRows.put(workload, Lists.newArrayList(Iterables.concat(cs != null ? cs.cells() : CompactionSummary.EMPTY,
+ rs != null ? rs.cells() : ReadSummary.EMPTY)));
+ }
+ logger.info("");
+ logger.info("TAB DELIMITED:");
+ String header = Joiner.on('\t').join(Iterables.concat(CompactionSummary.HEADERS, ReadSummary.HEADERS));
+ for (List<String> group: groups)
+ {
+ logger.info(Joiner.on('\t').join(group));
+ logger.info(header);
+ logger.info(Joiner.on('\t').join(Iterables.concat(Iterables.transform(group, g -> fullRows.getOrDefault(g, Collections.emptyList())))));
+ }
+ }
+
+ private static int maybeInflate(int base, int inflate)
+ {
+ return PROFILING ? base * inflate : base;
+ }
+
+ private static int maybeInflate(int base)
+ {
+ return maybeInflate(base, 3);
+ }
+
+ private interface Workload
+ {
+ void setup();
+ ColumnFamilyStore getCfs();
+ String name();
+ List<Runnable> getReads();
+ }
+
+ private static Measurement createMeasurement()
+ {
+ return AGENT_MEASUREMENT ? new AgentMeasurement() : new MXMeasurement();
+ }
+
+ private interface Measurement
+ {
+ void start();
+
+ void stop();
+
+ long cpu();
+
+ long bytes();
+
+ long objects();
+
+ default String prettyBytes()
+ {
+ return FBUtilities.prettyPrintMemory(bytes());
+ }
+
+ }
+
+ public static class AgentMeasurement implements Measurement, Sampler
+ {
+ long objectsAllocated = 0;
+ long bytesAllocated = 0;
+
+ private final long threadID = Thread.currentThread().getId();
+
+ public void sampleAllocation(int count, String desc, Object newObj, long bytes)
+ {
+ if (Thread.currentThread().getId() != threadID)
+ return;
+ objectsAllocated++;
+ bytesAllocated += bytes;
+ }
+
+ public void start()
+ {
+ AllocationRecorder.addSampler(this);
+ }
+
+ public void stop()
+ {
+ AllocationRecorder.removeSampler(this);
+ if (bytesAllocated == 0)
+ logger.warn("no allocations recorded, make sure junit is run with -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar");
+ }
+
+ public long cpu()
+ {
+ return 0;
+ }
+
+ public long objects()
+ {
+ return objectsAllocated - MIN_OBJECTS_ALLOCATED;
+ }
+
+ public long bytes()
+ {
+ return bytesAllocated - MIN_BYTES_ALLOCATED;
+ }
+ }
+
+ public static class MXMeasurement implements Measurement
+ {
+ private final Thread thread = Thread.currentThread();
+
+ private class Point
+ {
+ long bytes;
+ long cpu;
+
+ void capture()
+ {
+ bytes = threadMX.getThreadAllocatedBytes(thread.getId());
+ cpu = threadMX.getThreadCpuTime(thread.getId());
+ }
+ }
+
+ private final Point start = new Point();
+ private final Point stop = new Point();
+
+ public void start()
+ {
+ start.capture();
+ }
+
+ public void stop()
+ {
+ stop.capture();
+ }
+
+ public long cpu()
+ {
+ return stop.cpu - start.cpu;
+ }
+
+ public long bytes()
+ {
+ return stop.bytes - start.bytes;
+ }
+
+ public long objects()
+ {
+ return 0;
+ }
+ }
+
+ @Test
+ public void allocMeasuring()
+ {
+ long size = ObjectSizes.measure(5);
+ int numAlloc = 1000;
+
+ Measurement measurement = createMeasurement();
+ measurement.start();
+ for (int i=0; i<numAlloc; i++)
+ new Integer(i);
+
+ measurement.stop();
+ logger.info(" ** {}", measurement.prettyBytes());
+ logger.info(" ** expected {}", size * numAlloc);
+ }
+
+ private static void measure(Workload workload) throws Throwable
+ {
+ workload.setup();
+
+ Measurement readSampler = createMeasurement();
+ Measurement compactionSampler = createMeasurement();
+
+ String readSummary = "SKIPPED";
+ if (!PROFILING_COMPACTION)
+ {
+ List<Runnable> reads = workload.getReads();
+ readSampler.start();
+ if (PROFILING_READS && !workload.name().equals("warmup"))
+ {
+ logger.info(">>> Start profiling");
+ Thread.sleep(10000);
+ }
+ for (int i=0; i<reads.size(); i++)
+ reads.get(i).run();
+ Thread.sleep(1000);
+ if (PROFILING_READS && !workload.name().equals("warmup"))
+ {
+ logger.info(">>> Stop profiling");
+ Thread.sleep(10000);
+ }
+ readSampler.stop();
+
+ readSummary = String.format("%s bytes, %s /read, %s cpu", readSampler.bytes(), readSampler.bytes()/reads.size(), readSampler.cpu());
+ readSummaries.put(workload.name(), new ReadSummary(readSampler, reads.size()));
+ }
+
+ ColumnFamilyStore cfs = workload.getCfs();
+ ActiveCompactions active = new ActiveCompactions();
+ Set<SSTableReader> sstables = cfs.getLiveSSTables();
+
+ CompactionTasks tasks = cfs.getCompactionStrategyManager()
+ .getUserDefinedTasks(sstables, FBUtilities.nowInSeconds());
+ Assert.assertFalse(tasks.isEmpty());
+
+ String compactionSummary = "SKIPPED";
+ if (!PROFILING_READS)
+ {
+ compactionSampler.start();
+ if (PROFILING_COMPACTION && !workload.name().equals("warmup"))
+ {
+ logger.info(">>> Start profiling");
+ Thread.sleep(10000);
+ }
+ for (AbstractCompactionTask task : tasks)
+ task.execute(active);
+ Thread.sleep(1000);
+ if (PROFILING_COMPACTION && !workload.name().equals("warmup"))
+ {
+ logger.info(">>> Stop profiling");
+ Thread.sleep(10000);
+ }
+ compactionSampler.stop();
+
+ Assert.assertEquals(1, cfs.getLiveSSTables().size());
+ int numPartitions = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().estimatedPartitionSize.count());
+ int numRows = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().totalRows);
+
+ compactionSummary = String.format("%s bytes, %s /partition, %s /row, %s cpu", compactionSampler.bytes(), compactionSampler.bytes()/numPartitions, compactionSampler.bytes()/numRows, compactionSampler.cpu());
+ compactionSummaries.put(workload.name(), new CompactionSummary(compactionSampler, numPartitions, numRows));
+ }
+
+ cfs.truncateBlocking();
+
+ logger.info("***");
+ logger.info("*** {} reads summary", workload.name());
+ logger.info(readSummary);
+ logger.info("*** {} compaction summary", workload.name());
+ logger.info(compactionSummary);
+ if (!workload.name().equals("warmup"))
+ {
+ summaries.add(workload.name() + " reads summary: " + readSummary);
+ summaries.add(workload.name() + " compaction summary: " + compactionSummary);
+ }
+ Thread.sleep(1000); // avoid losing report when running in IDE
+ }
+
+ private static final DataOutputPlus NOOP_OUT = new UnbufferedDataOutputStreamPlus()
+ {
+ public void write(byte[] buffer, int offset, int count) throws IOException {}
+
+ public void write(int oneByte) throws IOException {}
+ };
+
+ private static void runQuery(ReadQuery query, TableMetadata metadata)
+ {
+ try (ReadExecutionController executionController = query.executionController();
+ UnfilteredPartitionIterator partitions = query.executeLocally(executionController))
+ {
+ UnfilteredPartitionIterators.serializerForIntraNode().serialize(partitions, ColumnFilter.all(metadata), NOOP_OUT, MessagingService.current_version);
+ }
+ catch (IOException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ private static void testTinyPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap) throws Throwable
+ {
+ String ksname = "ks_" + name.toLowerCase();
+
+ SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
+ CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", ksname).build());
+
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
+ Assert.assertNotNull(cfs);
+ cfs.disableAutoCompaction();
+ List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
+
+ measure(new Workload()
+ {
+ public void setup()
+ {
+ cfs.disableAutoCompaction();
+ String insert = String.format("INSERT INTO %s.%s (k, v) VALUES (?,?)", ksname, "tbl");
+ String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
+ SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
+ QueryState queryState = QueryState.forInternalCalls();
+ for (int f=0; f<numSSTable; f++)
+ {
+ for (int p = 0; p < sstablePartitions; p++)
+ {
+ int key = overlap ? p : (f * sstablePartitions) + p;
+ QueryProcessor.executeInternal(insert, key, key);
+ if (!overlap || f == 0)
+ {
+ QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{f});
+ ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
+ reads.add(() -> runQuery(query, cfs.metadata.get()));
+ }
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
+ }
+
+ public List<Runnable> getReads()
+ {
+ return reads;
+ }
+
+ public ColumnFamilyStore getCfs()
+ {
+ return cfs;
+ }
+
+ public String name()
+ {
+ return name;
+ }
+ });
+ }
+
+ @Test
+ public void tinyNonOverlapping3() throws Throwable
+ {
+ testTinyPartitions("tinyNonOverlapping3", 3, maybeInflate(900, 6), false);
+ }
+
+ @Test
+ public void tinyNonOverlapping9() throws Throwable
+ {
+ testTinyPartitions("tinyNonOverlapping9", 9, maybeInflate(300, 6), false);
+ }
+
+ @Test
+ public void tinyOverlapping3() throws Throwable
+ {
+ testTinyPartitions("tinyOverlapping3", 3, maybeInflate(900, 6), true);
+ }
+
+ @Test
+ public void tinyOverlapping9() throws Throwable
+ {
+ testTinyPartitions("tinyOverlapping9", 9, maybeInflate(300, 6), true);
+ }
+
+ private static final Random globalRandom = new Random();
+ private static final Random localRandom = new Random();
+
+ public static String makeRandomString(int length)
+ {
+ return makeRandomString(length, -1);
+
+ }
+
+ public static String makeRandomString(int length, int seed)
+ {
+ Random r;
+ if (seed < 0)
+ {
+ r = globalRandom;
+ }
+ else
+ {
+ r = localRandom;
+ r.setSeed(seed);
+ }
+
+ char[] chars = new char[length];
+ for (int i = 0; i < length; ++i)
+ chars[i] = (char) ('a' + r.nextInt('z' - 'a' + 1));
+ return new String(chars);
+ }
+
+ private static void testMediumPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable
+ {
+ String ksname = "ks_" + name.toLowerCase();
+
+ SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
+ CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build());
+
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
+ Assert.assertNotNull(cfs);
+ cfs.disableAutoCompaction();
+ int rowsPerPartition = 200;
+ List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
+ measure(new Workload()
+ {
+ public void setup()
+ {
+ cfs.disableAutoCompaction();
+ String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl");
+ String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
+ SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
+ QueryState queryState = QueryState.forInternalCalls();
+ for (int f=0; f<numSSTable; f++)
+ {
+ for (int p = 0; p < sstablePartitions; p++)
+ {
+ String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p);
+ for (int r=0; r<rowsPerPartition; r++)
+ {
+ QueryProcessor.executeInternal(insert, key, makeRandomString(6, overlapCK ? r : -1),
+ makeRandomString(8), makeRandomString(8),
+ makeRandomString(8), makeRandomString(8));
+
+ }
+ if (!overlap || f == 0)
+ {
+ QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key});
+ ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
+ reads.add(() -> runQuery(query, cfs.metadata.get()));
+ }
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
+ }
+
+ public ColumnFamilyStore getCfs()
+ {
+ return cfs;
+ }
+
+ public List<Runnable> getReads()
+ {
+ return reads;
+ }
+
+ public String name()
+ {
+ return name;
+ }
+ });
+ }
+
+ @Test
+ public void mediumNonOverlappingPartitions3() throws Throwable
+ {
+ testMediumPartitions("mediumNonOverlappingPartitions3", 3, maybeInflate(60), false, false);
+ }
+
+ @Test
+ public void mediumNonOverlappingPartitions9() throws Throwable
+ {
+ testMediumPartitions("mediumNonOverlappingPartitions9", 9, maybeInflate(20), false, false);
+ }
+
+ @Test
+ public void mediumOverlappingPartitions3() throws Throwable
+ {
+ testMediumPartitions("mediumOverlappingPartitions3", 3, maybeInflate(60), true, false);
+ }
+
+ @Test
+ public void mediumOverlappingPartitions9() throws Throwable
+ {
+ testMediumPartitions("mediumOverlappingPartitions9", 9, maybeInflate(20), true, false);
+ }
+
+ @Test
+ public void mediumPartitionsOverlappingRows3() throws Throwable
+ {
+ testMediumPartitions("mediumPartitionsOverlappingRows3", 3, maybeInflate(60), true, true);
+ }
+
+ @Test
+ public void mediumPartitionsOverlappingRows9() throws Throwable
+ {
+ testMediumPartitions("mediumPartitionsOverlappingRows9", 9, maybeInflate(20), true, true);
+ }
+
+ private static void testWidePartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable
+ {
+ String ksname = "ks_" + name.toLowerCase();
+
+ SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
+ CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build());
+
+ ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
+ Assert.assertNotNull(cfs);
+ cfs.disableAutoCompaction();
+ int rowWidth = 100;
+ int rowsPerPartition = 1000;
+ List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
+
+ measure(new Workload()
+ {
+ public void setup()
+ {
+ cfs.disableAutoCompaction();
+ String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl");
+ String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
+ SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
+ QueryState queryState = QueryState.forInternalCalls();
+ for (int f=0; f<numSSTable; f++)
+ {
+ for (int p = 0; p < sstablePartitions; p++)
+ {
+ String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p);
+ for (int r=0; r<rowsPerPartition; r++)
+ {
+ QueryProcessor.executeInternal(insert , key, makeRandomString(6, overlapCK ? r : -1),
+ makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2),
+ makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2));
+ }
+ if (!overlap || f == 0)
+ {
+ QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key});
+ ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
+ reads.add(() -> runQuery(query, cfs.metadata.get()));
+ }
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
+ }
+
+ public ColumnFamilyStore getCfs()
+ {
+ return cfs;
+ }
+
+ public List<Runnable> getReads()
+ {
+ return reads;
+ }
+
+ public String name()
+ {
+ return name;
+ }
+ });
+ }
+
+ @Test
+ public void wideNonOverlappingPartitions3() throws Throwable
+ {
+ testWidePartitions("wideNonOverlappingPartitions3", 3, maybeInflate(24), false, false);
+ }
+
+ @Test
+ public void wideNonOverlappingPartitions9() throws Throwable
+ {
+ testWidePartitions("wideNonOverlappingPartitions9", 9, maybeInflate(8), false, false);
+ }
+
+ @Test
+ public void wideOverlappingPartitions3() throws Throwable
+ {
+ testWidePartitions("wideOverlappingPartitions3", 3, maybeInflate(24), true, false);
+ }
+
+ @Test
+ public void wideOverlappingPartitions9() throws Throwable
+ {
+ testWidePartitions("wideOverlappingPartitions9", 9, maybeInflate(8), true, false);
+ }
+
+ @Test
+ public void widePartitionsOverlappingRows9() throws Throwable
+ {
+ testWidePartitions("widePartitionsOverlappingRows9", 9, maybeInflate(8), true, true);
+ }
+
+ @Test
+ public void widePartitionsOverlappingRows3() throws Throwable
+ {
+ testWidePartitions("widePartitionsOverlappingRows3", 3, maybeInflate(24), true, true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org