You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/03/11 19:52:33 UTC

[cassandra] branch trunk updated: Add compaction allocation measurement test

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c9e4027  Add compaction allocation measurement test
c9e4027 is described below

commit c9e40277f0389a773b21d4310a85a8c761932d61
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Tue Oct 15 13:04:26 2019 -0700

    Add compaction allocation measurement test
    
    Patch by Blake Eggleston, reviewed by Benedict Elliott Smith and David Capwell for CASSANDRA-15388
---
 CHANGES.txt                                        |   1 +
 build.xml                                          |  13 +
 ide/idea-iml-file.xml                              |   1 +
 .../db/compaction/CompactionAllocationTest.java    | 767 +++++++++++++++++++++
 4 files changed, 782 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index d863df2..c6a2eac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Add compaction allocation measurement test (CASSANDRA-15388)
  * Added UnleveledSSTables global and table level metric (CASSANDRA-15620)
  * Added Virtual Table exposing Cassandra relevant system properties (CASSANDRA-15616)
  * Add data modeling introduction (CASSANDRA-15481)
diff --git a/build.xml b/build.xml
index 9c4f5b0..bd1f557 100644
--- a/build.xml
+++ b/build.xml
@@ -60,6 +60,7 @@
     <property name="test.unit.src" value="${test.dir}/unit"/>
     <property name="test.long.src" value="${test.dir}/long"/>
     <property name="test.burn.src" value="${test.dir}/burn"/>
+    <property name="test.memory.src" value="${test.dir}/memory"/>
     <property name="test.microbench.src" value="${test.dir}/microbench"/>
     <property name="test.distributed.src" value="${test.dir}/distributed"/>
     <property name="test.compression_algo" value="LZ4"/>
@@ -96,6 +97,7 @@
     <property name="maven-repository-id" value="apache.snapshots.https"/>
 
     <property name="test.timeout" value="240000" />
+    <property name="test.memory.timeout" value="480000" />
     <property name="test.long.timeout" value="600000" />
     <property name="test.burn.timeout" value="60000000" />
     <property name="test.distributed.timeout" value="360000" />
@@ -118,6 +120,7 @@
     <property name="ecj.version" value="4.6.1"/>
     <property name="ohc.version" value="0.5.1"/>
     <property name="asm.version" value="7.1"/>
+    <property name="allocation-instrumenter.version" value="3.1.0"/>
 
     <!-- https://mvnrepository.com/artifact/net.openhft/chronicle-bom/1.16.23 -->
     <property name="chronicle-queue.version" value="4.16.3" />
@@ -539,6 +542,7 @@
           <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
           <dependency groupId="junit" artifactId="junit" version="4.12" />
           <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
+          <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
           <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
              <exclusion groupId="commons-lang" artifactId="commons-lang"/>
           </dependency>
@@ -672,6 +676,7 @@
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
         <dependency groupId="org.quicktheories" artifactId="quicktheories" />
+        <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
         <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
@@ -1324,6 +1329,7 @@
      <src path="${test.unit.src}"/>
      <src path="${test.long.src}"/>
      <src path="${test.burn.src}"/>
+     <src path="${test.memory.src}"/>
      <src path="${test.microbench.src}"/>
      <src path="${test.distributed.src}"/>
     </javac>
@@ -1620,6 +1626,13 @@
     </testmacro>
   </target>
 
+  <target name="test-memory" depends="build-test" description="Execute functional tests">
+      <testmacro inputdir="${test.memory.src}"
+                 timeout="${test.memory.timeout}">
+          <jvmarg value="-javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar"/>
+      </testmacro>
+  </target>
+
   <target name="cql-test" depends="build-test" description="Execute CQL tests">
     <sequential>
       <echo message="running CQL tests"/>
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index 17d4c5b..86827c3 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -32,6 +32,7 @@
             <sourceFolder url="file://$MODULE_DIR$/tools/fqltool/test/unit" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/unit" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/long" isTestSource="true" />
+            <sourceFolder url="file://$MODULE_DIR$/test/memory" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/microbench" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/burn" isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/distributed" isTestSource="true" />
diff --git a/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java b/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java
new file mode 100644
index 0000000..a58303d
--- /dev/null
+++ b/test/memory/org/apache/cassandra/db/compaction/CompactionAllocationTest.java
@@ -0,0 +1,767 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compaction;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.monitoring.runtime.instrumentation.AllocationRecorder;
+import com.google.monitoring.runtime.instrumentation.Sampler;
+import com.sun.management.ThreadMXBean;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.ReadQuery;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.UnbufferedDataOutputStreamPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class CompactionAllocationTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(CompactionAllocationTest.class);
+    private static final ThreadMXBean threadMX = (ThreadMXBean) ManagementFactory.getThreadMXBean();
+
+    private static final boolean AGENT_MEASUREMENT = true;
+
+    private static final boolean PROFILING_READS = false;
+    private static final boolean PROFILING_COMPACTION = false;
+    private static final boolean PROFILING = PROFILING_READS || PROFILING_COMPACTION;
+    private static final List<String> summaries = new ArrayList<>();
+
+    private static class CompactionSummary
+    {
+        final Measurement measurement;
+        final int numPartitions;
+        final int numRows;
+
+        public CompactionSummary(Measurement measurement, int numPartitions, int numRows)
+        {
+            this.measurement = measurement;
+            this.numPartitions = numPartitions;
+            this.numRows = numRows;
+        }
+
+        List<String> cells()
+        {
+            long b = measurement.bytes();
+            return Lists.newArrayList(Long.toString(b), Long.toString(b/numPartitions), Long.toString(b/numRows));
+        }
+
+        static final List<String> HEADERS = Lists.newArrayList("bytes", "/p", "/r");
+        static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a", "n/a");
+    }
+
+    private static class ReadSummary
+    {
+        final Measurement measurement;
+        final int numReads;
+
+        public ReadSummary(Measurement measurement, int numReads)
+        {
+            this.measurement = measurement;
+            this.numReads = numReads;
+        }
+
+        List<String> cells()
+        {
+            long b = measurement.bytes();
+            return Lists.newArrayList(Long.toString(b), Long.toString(b/numReads));
+        }
+        static final List<String> HEADERS = Lists.newArrayList("bytes", "/rd");
+        static final List<String> EMPTY = Lists.newArrayList("n/a", "n/a");
+    }
+
+    private static final Map<String, CompactionSummary> compactionSummaries = new HashMap<>();
+    private static final Map<String, ReadSummary> readSummaries = new HashMap<>();
+
+    /*
+    add to jvm args:
+        -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar
+     */
+
+    private static final long MIN_OBJECTS_ALLOCATED;
+    private static final long MIN_BYTES_ALLOCATED;
+
+    static
+    {
+        if (AGENT_MEASUREMENT)
+        {
+            AgentMeasurement measurement = new AgentMeasurement();
+            measurement.start();
+            measurement.stop();
+            MIN_OBJECTS_ALLOCATED = measurement.objectsAllocated;
+            MIN_BYTES_ALLOCATED = measurement.bytesAllocated;
+        }
+        else
+        {
+            MIN_OBJECTS_ALLOCATED = 0;
+            MIN_BYTES_ALLOCATED = 0;
+            logger.warn("{} is using the ThreadMXBean to measure memory usage, this is less accurate than the allocation instrumenter agent", CompactionAllocationTest.class.getSimpleName());
+            logger.warn("If you're running this in your IDE, add the following jvm arg: " +
+                        "-javaagent:<build.dir>/lib/jars/java-allocation-instrumenter-<allocation-instrumenter.version>.jar " +
+                        "(and replace <> with appropriate values from build.xml)");
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass() throws Throwable
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.startGossiper();
+        testTinyPartitions("warmup", 9, maybeInflate(300), true);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+
+        logger.info("SUMMARIES:");
+        for (String summary : summaries)
+            logger.info(summary);
+
+
+        List<List<String>> groups = new ArrayList<>();
+        groups.add(Lists.newArrayList("tinyNonOverlapping3",
+                                      "tinyNonOverlapping9",
+                                      "tinyOverlapping3",
+                                      "tinyOverlapping9"));
+        groups.add(Lists.newArrayList("mediumNonOverlappingPartitions3",
+                                      "mediumNonOverlappingPartitions9",
+                                      "mediumOverlappingPartitions3",
+                                      "mediumOverlappingPartitions9",
+                                      "mediumPartitionsOverlappingRows3",
+                                      "mediumPartitionsOverlappingRows9"));
+        groups.add(Lists.newArrayList("wideNonOverlappingPartitions3",
+                                      "wideNonOverlappingPartitions9",
+                                      "wideOverlappingPartitions3",
+                                      "wideOverlappingPartitions9",
+                                      "widePartitionsOverlappingRows9",
+                                      "widePartitionsOverlappingRows3"));
+
+        Map<String, List<String>> fullRows = new HashMap<>();
+        for (String workload : Iterables.concat(groups))
+        {
+            CompactionSummary cs = compactionSummaries.get(workload);
+            ReadSummary rs = readSummaries.get(workload);
+            fullRows.put(workload, Lists.newArrayList(Iterables.concat(cs != null ? cs.cells() : CompactionSummary.EMPTY,
+                                                                       rs != null ? rs.cells() : ReadSummary.EMPTY)));
+        }
+        logger.info("");
+        logger.info("TAB DELIMITED:");
+        String header = Joiner.on('\t').join(Iterables.concat(CompactionSummary.HEADERS, ReadSummary.HEADERS));
+        for (List<String> group: groups)
+        {
+            logger.info(Joiner.on('\t').join(group));
+            logger.info(header);
+            logger.info(Joiner.on('\t').join(Iterables.concat(Iterables.transform(group, g -> fullRows.getOrDefault(g, Collections.emptyList())))));
+        }
+    }
+
+    private static int maybeInflate(int base, int inflate)
+    {
+        return PROFILING ? base * inflate : base;
+    }
+
+    private static int maybeInflate(int base)
+    {
+        return maybeInflate(base, 3);
+    }
+
+    private interface Workload
+    {
+        void setup();
+        ColumnFamilyStore getCfs();
+        String name();
+        List<Runnable> getReads();
+    }
+
+    private static Measurement createMeasurement()
+    {
+        return AGENT_MEASUREMENT ? new AgentMeasurement() : new MXMeasurement();
+    }
+
+    private interface Measurement
+    {
+        void start();
+
+        void stop();
+
+        long cpu();
+
+        long bytes();
+
+        long objects();
+
+        default String prettyBytes()
+        {
+            return FBUtilities.prettyPrintMemory(bytes());
+        }
+
+    }
+
+    public static class AgentMeasurement implements Measurement, Sampler
+    {
+        long objectsAllocated = 0;
+        long bytesAllocated = 0;
+
+        private final long threadID = Thread.currentThread().getId();
+
+        public void sampleAllocation(int count, String desc, Object newObj, long bytes)
+        {
+            if (Thread.currentThread().getId() != threadID)
+                return;
+            objectsAllocated++;
+            bytesAllocated += bytes;
+        }
+
+        public void start()
+        {
+            AllocationRecorder.addSampler(this);
+        }
+
+        public void stop()
+        {
+            AllocationRecorder.removeSampler(this);
+            if (bytesAllocated == 0)
+                logger.warn("no allocations recorded, make sure junit is run with -javaagent:${build.dir}/lib/jars/java-allocation-instrumenter-${allocation-instrumenter.version}.jar");
+        }
+
+        public long cpu()
+        {
+            return 0;
+        }
+
+        public long objects()
+        {
+            return objectsAllocated - MIN_OBJECTS_ALLOCATED;
+        }
+
+        public long bytes()
+        {
+            return bytesAllocated - MIN_BYTES_ALLOCATED;
+        }
+    }
+
+    public static class MXMeasurement implements Measurement
+    {
+        private final Thread thread = Thread.currentThread();
+
+        private class Point
+        {
+            long bytes;
+            long cpu;
+
+            void capture()
+            {
+                bytes = threadMX.getThreadAllocatedBytes(thread.getId());
+                cpu = threadMX.getThreadCpuTime(thread.getId());
+            }
+        }
+
+        private final Point start = new Point();
+        private final Point stop = new Point();
+
+        public void start()
+        {
+            start.capture();
+        }
+
+        public void stop()
+        {
+            stop.capture();
+        }
+
+        public long cpu()
+        {
+            return stop.cpu - start.cpu;
+        }
+
+        public long bytes()
+        {
+            return stop.bytes - start.bytes;
+        }
+
+        public long objects()
+        {
+            return 0;
+        }
+    }
+
+    @Test
+    public void allocMeasuring()
+    {
+        long size = ObjectSizes.measure(5);
+        int numAlloc = 1000;
+
+        Measurement measurement = createMeasurement();
+        measurement.start();
+        for (int i=0; i<numAlloc; i++)
+            new Integer(i);
+
+        measurement.stop();
+        logger.info(" ** {}", measurement.prettyBytes());
+        logger.info(" ** expected {}", size * numAlloc);
+    }
+
+    private static void measure(Workload workload) throws Throwable
+    {
+        workload.setup();
+
+        Measurement readSampler = createMeasurement();
+        Measurement compactionSampler = createMeasurement();
+
+        String readSummary = "SKIPPED";
+        if (!PROFILING_COMPACTION)
+        {
+            List<Runnable> reads = workload.getReads();
+            readSampler.start();
+            if (PROFILING_READS && !workload.name().equals("warmup"))
+            {
+                logger.info(">>> Start profiling");
+                Thread.sleep(10000);
+            }
+            for (int i=0; i<reads.size(); i++)
+                reads.get(i).run();
+            Thread.sleep(1000);
+            if (PROFILING_READS && !workload.name().equals("warmup"))
+            {
+                logger.info(">>> Stop profiling");
+                Thread.sleep(10000);
+            }
+            readSampler.stop();
+
+            readSummary = String.format("%s bytes, %s /read, %s cpu", readSampler.bytes(), readSampler.bytes()/reads.size(), readSampler.cpu());
+            readSummaries.put(workload.name(), new ReadSummary(readSampler, reads.size()));
+        }
+
+        ColumnFamilyStore cfs = workload.getCfs();
+        ActiveCompactions active = new ActiveCompactions();
+        Set<SSTableReader> sstables = cfs.getLiveSSTables();
+
+        CompactionTasks tasks = cfs.getCompactionStrategyManager()
+                                   .getUserDefinedTasks(sstables, FBUtilities.nowInSeconds());
+        Assert.assertFalse(tasks.isEmpty());
+
+        String compactionSummary = "SKIPPED";
+        if (!PROFILING_READS)
+        {
+            compactionSampler.start();
+            if (PROFILING_COMPACTION && !workload.name().equals("warmup"))
+            {
+                logger.info(">>> Start profiling");
+                Thread.sleep(10000);
+            }
+            for (AbstractCompactionTask task : tasks)
+                task.execute(active);
+            Thread.sleep(1000);
+            if (PROFILING_COMPACTION && !workload.name().equals("warmup"))
+            {
+                logger.info(">>> Stop profiling");
+                Thread.sleep(10000);
+            }
+            compactionSampler.stop();
+
+            Assert.assertEquals(1, cfs.getLiveSSTables().size());
+            int numPartitions = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().estimatedPartitionSize.count());
+            int numRows = Ints.checkedCast(Iterables.getOnlyElement(cfs.getLiveSSTables()).getSSTableMetadata().totalRows);
+
+            compactionSummary = String.format("%s bytes, %s /partition, %s /row, %s cpu", compactionSampler.bytes(), compactionSampler.bytes()/numPartitions, compactionSampler.bytes()/numRows, compactionSampler.cpu());
+            compactionSummaries.put(workload.name(), new CompactionSummary(compactionSampler, numPartitions, numRows));
+        }
+
+        cfs.truncateBlocking();
+
+        logger.info("***");
+        logger.info("*** {} reads summary", workload.name());
+        logger.info(readSummary);
+        logger.info("*** {} compaction summary", workload.name());
+        logger.info(compactionSummary);
+        if (!workload.name().equals("warmup"))
+        {
+            summaries.add(workload.name() + " reads summary: " + readSummary);
+            summaries.add(workload.name() + " compaction summary: " + compactionSummary);
+        }
+        Thread.sleep(1000); // avoid losing report when running in IDE
+    }
+
+    private static final DataOutputPlus NOOP_OUT = new UnbufferedDataOutputStreamPlus()
+    {
+        public void write(byte[] buffer, int offset, int count) throws IOException {}
+
+        public void write(int oneByte) throws IOException {}
+    };
+
+    private static void runQuery(ReadQuery query, TableMetadata metadata)
+    {
+        try (ReadExecutionController executionController = query.executionController();
+             UnfilteredPartitionIterator partitions = query.executeLocally(executionController))
+        {
+            UnfilteredPartitionIterators.serializerForIntraNode().serialize(partitions, ColumnFilter.all(metadata), NOOP_OUT, MessagingService.current_version);
+        }
+        catch (IOException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    private static void testTinyPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap) throws Throwable
+    {
+        String ksname = "ks_" + name.toLowerCase();
+
+        SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
+                                    CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", ksname).build());
+
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
+        Assert.assertNotNull(cfs);
+        cfs.disableAutoCompaction();
+        List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
+
+        measure(new Workload()
+        {
+            public void setup()
+            {
+                cfs.disableAutoCompaction();
+                String insert = String.format("INSERT INTO %s.%s (k, v) VALUES (?,?)", ksname, "tbl");
+                String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
+                SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
+                QueryState queryState = QueryState.forInternalCalls();
+                for (int f=0; f<numSSTable; f++)
+                {
+                    for (int p = 0; p < sstablePartitions; p++)
+                    {
+                        int key = overlap ? p : (f * sstablePartitions) + p;
+                        QueryProcessor.executeInternal(insert, key, key);
+                        if (!overlap || f == 0)
+                        {
+                            QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{f});
+                            ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
+                            reads.add(() -> runQuery(query, cfs.metadata.get()));
+                        }
+                    }
+                    cfs.forceBlockingFlush();
+                }
+
+                Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
+            }
+
+            public List<Runnable> getReads()
+            {
+                return reads;
+            }
+
+            public ColumnFamilyStore getCfs()
+            {
+                return cfs;
+            }
+
+            public String name()
+            {
+                return name;
+            }
+        });
+    }
+
+    @Test
+    public void tinyNonOverlapping3() throws Throwable
+    {
+        testTinyPartitions("tinyNonOverlapping3", 3, maybeInflate(900, 6), false);
+    }
+
+    @Test
+    public void tinyNonOverlapping9() throws Throwable
+    {
+        testTinyPartitions("tinyNonOverlapping9", 9, maybeInflate(300, 6), false);
+    }
+
+    @Test
+    public void tinyOverlapping3() throws Throwable
+    {
+        testTinyPartitions("tinyOverlapping3", 3, maybeInflate(900, 6), true);
+    }
+
+    @Test
+    public void tinyOverlapping9() throws Throwable
+    {
+        testTinyPartitions("tinyOverlapping9", 9, maybeInflate(300, 6), true);
+    }
+
+    private static final Random globalRandom = new Random();
+    private static final Random localRandom = new Random();
+
+    public static String makeRandomString(int length)
+    {
+        return makeRandomString(length, -1);
+
+    }
+
+    public static String makeRandomString(int length, int seed)
+    {
+        Random r;
+        if (seed < 0)
+        {
+            r = globalRandom;
+        }
+        else
+        {
+            r = localRandom;
+            r.setSeed(seed);
+        }
+
+        char[] chars = new char[length];
+        for (int i = 0; i < length; ++i)
+            chars[i] = (char) ('a' + r.nextInt('z' - 'a' + 1));
+        return new String(chars);
+    }
+
+    private static void testMediumPartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable
+    {
+        String ksname = "ks_" + name.toLowerCase();
+
+        SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
+                                    CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build());
+
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
+        Assert.assertNotNull(cfs);
+        cfs.disableAutoCompaction();
+        int rowsPerPartition = 200;
+        List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
+        measure(new Workload()
+        {
+            public void setup()
+            {
+                cfs.disableAutoCompaction();
+                String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl");
+                String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
+                SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
+                QueryState queryState = QueryState.forInternalCalls();
+                for (int f=0; f<numSSTable; f++)
+                {
+                    for (int p = 0; p < sstablePartitions; p++)
+                    {
+                        String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p);
+                        for (int r=0; r<rowsPerPartition; r++)
+                        {
+                            QueryProcessor.executeInternal(insert, key, makeRandomString(6, overlapCK ? r : -1),
+                                                           makeRandomString(8), makeRandomString(8),
+                                                           makeRandomString(8), makeRandomString(8));
+
+                        }
+                        if (!overlap || f == 0)
+                        {
+                            QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key});
+                            ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
+                            reads.add(() -> runQuery(query, cfs.metadata.get()));
+                        }
+                    }
+                    cfs.forceBlockingFlush();
+                }
+
+                Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
+            }
+
+            public ColumnFamilyStore getCfs()
+            {
+                return cfs;
+            }
+
+            public List<Runnable> getReads()
+            {
+                return reads;
+            }
+
+            public String name()
+            {
+                return name;
+            }
+        });
+    }
+
+    @Test
+    public void mediumNonOverlappingPartitions3() throws Throwable
+    {
+        testMediumPartitions("mediumNonOverlappingPartitions3", 3, maybeInflate(60), false, false);
+    }
+
+    @Test
+    public void mediumNonOverlappingPartitions9() throws Throwable
+    {
+        testMediumPartitions("mediumNonOverlappingPartitions9", 9, maybeInflate(20), false, false);
+    }
+
+    @Test
+    public void mediumOverlappingPartitions3() throws Throwable
+    {
+        testMediumPartitions("mediumOverlappingPartitions3", 3, maybeInflate(60), true, false);
+    }
+
+    @Test
+    public void mediumOverlappingPartitions9() throws Throwable
+    {
+        testMediumPartitions("mediumOverlappingPartitions9", 9, maybeInflate(20), true, false);
+    }
+
+    @Test
+    public void mediumPartitionsOverlappingRows3() throws Throwable
+    {
+        testMediumPartitions("mediumPartitionsOverlappingRows3", 3, maybeInflate(60), true, true);
+    }
+
+    @Test
+    public void mediumPartitionsOverlappingRows9() throws Throwable
+    {
+        testMediumPartitions("mediumPartitionsOverlappingRows9", 9, maybeInflate(20), true, true);
+    }
+
+    private static void testWidePartitions(String name, int numSSTable, int sstablePartitions, boolean overlap, boolean overlapCK) throws Throwable
+    {
+        String ksname = "ks_" + name.toLowerCase();
+
+        SchemaLoader.createKeyspace(ksname, KeyspaceParams.simple(1),
+                                    CreateTableStatement.parse("CREATE TABLE tbl (k text, c text, v1 text, v2 text, v3 text, v4 text, PRIMARY KEY (k, c))", ksname).build());
+
+        ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata(ksname, "tbl").id);
+        Assert.assertNotNull(cfs);
+        cfs.disableAutoCompaction();
+        int rowWidth = 100;
+        int rowsPerPartition = 1000;
+        List<Runnable> reads = new ArrayList<>(numSSTable * (overlap ? 1 : sstablePartitions));
+
+        measure(new Workload()
+        {
+            public void setup()
+            {
+                cfs.disableAutoCompaction();
+                String insert = String.format("INSERT INTO %s.%s (k, c, v1, v2, v3, v4) VALUES (?, ?, ?, ?, ?, ?)", ksname, "tbl");
+                String read = String.format("SELECT * FROM %s.%s WHERE k = ?", ksname, "tbl");
+                SelectStatement select = (SelectStatement) QueryProcessor.parseStatement(read).prepare(ClientState.forInternalCalls());
+                QueryState queryState = QueryState.forInternalCalls();
+                for (int f=0; f<numSSTable; f++)
+                {
+                    for (int p = 0; p < sstablePartitions; p++)
+                    {
+                        String key = String.format("%08d", overlap ? p : (f * sstablePartitions) + p);
+                        for (int r=0; r<rowsPerPartition; r++)
+                        {
+                            QueryProcessor.executeInternal(insert , key, makeRandomString(6, overlapCK ? r : -1),
+                                                           makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2),
+                                                           makeRandomString(rowWidth>>2), makeRandomString(rowWidth>>2));
+                        }
+                        if (!overlap || f == 0)
+                        {
+                            QueryOptions options = QueryProcessor.makeInternalOptions(select, new Object[]{key});
+                            ReadQuery query = select.getQuery(options, queryState.getNowInSeconds());
+                            reads.add(() -> runQuery(query, cfs.metadata.get()));
+                        }
+                    }
+                    cfs.forceBlockingFlush();
+                }
+
+                Assert.assertEquals(numSSTable, cfs.getLiveSSTables().size());
+            }
+
+            public ColumnFamilyStore getCfs()
+            {
+                return cfs;
+            }
+
+            public List<Runnable> getReads()
+            {
+                return reads;
+            }
+
+            public String name()
+            {
+                return name;
+            }
+        });
+    }
+
+    @Test
+    public void wideNonOverlappingPartitions3() throws Throwable
+    {
+        testWidePartitions("wideNonOverlappingPartitions3", 3, maybeInflate(24), false, false);
+    }
+
+    @Test
+    public void wideNonOverlappingPartitions9() throws Throwable
+    {
+        testWidePartitions("wideNonOverlappingPartitions9", 9, maybeInflate(8), false, false);
+    }
+
+    @Test
+    public void wideOverlappingPartitions3() throws Throwable
+    {
+        testWidePartitions("wideOverlappingPartitions3", 3, maybeInflate(24), true, false);
+    }
+
+    @Test
+    public void wideOverlappingPartitions9() throws Throwable
+    {
+        testWidePartitions("wideOverlappingPartitions9", 9, maybeInflate(8), true, false);
+    }
+
+    @Test
+    public void widePartitionsOverlappingRows9() throws Throwable
+    {
+        testWidePartitions("widePartitionsOverlappingRows9", 9, maybeInflate(8), true, true);
+    }
+
+    @Test
+    public void widePartitionsOverlappingRows3() throws Throwable
+    {
+        testWidePartitions("widePartitionsOverlappingRows3", 3, maybeInflate(24), true, true);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org