You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2022/02/16 08:01:28 UTC

[cassandra] branch trunk updated: Intoduce Harry to the tree

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81922c5  Intoduce Harry to the tree
81922c5 is described below

commit 81922c5a7bcbf9db7564a29922c9d8f6222c7cdc
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Tue Oct 12 13:04:11 2021 +0200

    Intoduce Harry to the tree
    
    Patch by Alex Petrov; reviewed by Caleb Rackliffe and Abe Ratnofsky for CASSANDRA-16262.
---
 .build/build-rat.xml                               |   1 +
 build.xml                                          |   3 +-
 .../cql3/statements/ModificationStatement.java     |  14 +-
 test/conf/harry-generic.yaml                       |  75 +++++
 .../fuzz/FixedSchemaProviderConfiguration.java     |  43 +++
 .../cassandra/distributed/fuzz/FuzzTestBase.java   | 131 ++++++++
 .../cassandra/distributed/fuzz/HarryHelper.java    |  96 ++++++
 .../cassandra/distributed/fuzz/InJvmSut.java       |  86 +++++
 .../cassandra/distributed/fuzz/InJvmSutBase.java   | 294 +++++++++++++++++
 .../distributed/fuzz/QueryingNoOpChecker.java      |  66 ++++
 .../distributed/fuzz/SSTableGenerator.java         | 360 +++++++++++++++++++++
 .../distributed/fuzz/SSTableLoadingVisitor.java    | 113 +++++++
 .../fuzz/test/SSTableGeneratorTest.java            | 131 ++++++++
 13 files changed, 1410 insertions(+), 3 deletions(-)

diff --git a/.build/build-rat.xml b/.build/build-rat.xml
index 599d5ea..c988043 100644
--- a/.build/build-rat.xml
+++ b/.build/build-rat.xml
@@ -53,6 +53,7 @@
                  <exclude name="**/cassandra.yaml"/>
                  <exclude name="**/cassandra-murmur.yaml"/>
                  <exclude name="**/cassandra-seeds.yaml"/>
+                 <exclude name="**/harry-generic.yaml"/>
                  <exclude NAME="**/doc/antora.yml"/>
                  <exclude name="**/test/conf/cassandra.yaml"/>
                  <exclude name="**/test/conf/cassandra-old.yaml"/>
diff --git a/build.xml b/build.xml
index 07c4057..01e2061 100644
--- a/build.xml
+++ b/build.xml
@@ -539,6 +539,7 @@
             <exclusion groupId="com.google.guava" artifactId="guava"/>
           </dependency>
           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.12" scope="test"/>
+          <dependency groupId="org.apache.cassandra" artifactId="harry-core" version="0.0.1" scope="test"/>
           <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" scope="test"/>
           <dependency groupId="com.puppycrawl.tools" artifactId="checkstyle" version="8.40" scope="test"/>
           <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided">
@@ -742,7 +743,7 @@
         <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" scope="test"/>
         <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" scope="test"/>
         <dependency groupId="org.apache.ant" artifactId="ant-junit" scope="test"/>
-
+        <dependency groupId="org.apache.cassandra" artifactId="harry-core"/>
         <!-- adding this dependency is necessary for assertj. When updating assertj, need to also update the version of
              this that the new assertj's `assertj-parent-pom` depends on. -->
         <dependency groupId="org.junit" artifactId="junit-bom" type="pom"/>
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4233d23..b6d274a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -855,9 +855,19 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
 
     private Slices toSlices(SortedSet<ClusteringBound<?>> startBounds, SortedSet<ClusteringBound<?>> endBounds)
     {
+        return toSlices(metadata, startBounds, endBounds);
+    }
+
+    public static Slices toSlices(TableMetadata metadata, SortedSet<ClusteringBound<?>> startBounds, SortedSet<ClusteringBound<?>> endBounds)
+    {
+        return toSlices(metadata.comparator, startBounds, endBounds);
+    }
+
+    public static Slices toSlices(ClusteringComparator comparator, SortedSet<ClusteringBound<?>> startBounds, SortedSet<ClusteringBound<?>> endBounds)
+    {
         assert startBounds.size() == endBounds.size();
 
-        Slices.Builder builder = new Slices.Builder(metadata().comparator);
+        Slices.Builder builder = new Slices.Builder(comparator);
 
         Iterator<ClusteringBound<?>> starts = startBounds.iterator();
         Iterator<ClusteringBound<?>> ends = endBounds.iterator();
@@ -865,7 +875,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
         while (starts.hasNext())
         {
             Slice slice = Slice.make(starts.next(), ends.next());
-            if (!slice.isEmpty(metadata().comparator))
+            if (!slice.isEmpty(comparator))
             {
                 builder.add(slice);
             }
diff --git a/test/conf/harry-generic.yaml b/test/conf/harry-generic.yaml
new file mode 100644
index 0000000..e4bfbab
--- /dev/null
+++ b/test/conf/harry-generic.yaml
@@ -0,0 +1,75 @@
+seed: 1
+
+# Default schema provider generates random schema
+schema_provider:
+  default: {}
+
+drop_schema: false
+create_schema: true
+truncate_table: false
+
+clock:
+  approximate_monotonic:
+    history_size: 7300
+    epoch_length: 1
+    epoch_time_unit: "SECONDS"
+
+system_under_test:
+  println: {}
+
+partition_descriptor_selector:
+  default:
+    window_size: 100
+    slide_after_repeats: 10
+
+clustering_descriptor_selector:
+  default:
+    modifications_per_lts:
+      type: "constant"
+      constant: 2
+    rows_per_modification:
+      type: "constant"
+      constant: 2
+    operation_kind_weights:
+      DELETE_RANGE: 1
+      DELETE_SLICE: 1
+      DELETE_ROW: 1
+      DELETE_COLUMN: 1
+      DELETE_PARTITION: 1
+      INSERT: 50
+      UPDATE: 50
+      DELETE_COLUMN_WITH_STATICS: 1
+      INSERT_WITH_STATICS: 1
+      UPDATE_WITH_STATICS: 1
+    column_mask_bitsets: null
+    max_partition_size: 100
+
+data_tracker:
+  default:
+    max_seen_lts: -1
+    max_complete_lts: -1
+
+runner:
+  sequential:
+    run_time: 60
+    run_time_unit: "MINUTES"
+    visitors:
+      - logging:
+          row_visitor:
+            mutating: {}
+      - sampler:
+          trigger_after: 100000
+          sample_partitions: 10
+      - validate_recent_partitions:
+          partition_count: 5
+          trigger_after: 10000
+          model:
+            querying_no_op_checker: {}
+      - validate_all_partitions:
+          concurrency: 5
+          trigger_after: 10000
+          model:
+            querying_no_op_checker: {}
+
+metric_reporter:
+  no_op: {}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/FixedSchemaProviderConfiguration.java b/test/distributed/org/apache/cassandra/distributed/fuzz/FixedSchemaProviderConfiguration.java
new file mode 100644
index 0000000..2e03903
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/FixedSchemaProviderConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.ddl.SchemaSpec;
+import harry.model.sut.SystemUnderTest;
+
+@JsonTypeName("fixed")
+public class FixedSchemaProviderConfiguration implements Configuration.SchemaProviderConfiguration
+{
+    private final SchemaSpec schemaSpec;
+
+    @JsonCreator
+    public FixedSchemaProviderConfiguration(SchemaSpec schemaSpec)
+    {
+        this.schemaSpec = schemaSpec;
+    }
+
+    @Override
+    public SchemaSpec make(long l, SystemUnderTest systemUnderTest)
+    {
+        return this.schemaSpec;
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/FuzzTestBase.java b/test/distributed/org/apache/cassandra/distributed/fuzz/FuzzTestBase.java
new file mode 100644
index 0000000..f7327da
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/FuzzTestBase.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.util.Collection;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.clock.OffsetClock;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public abstract class FuzzTestBase extends TestBaseImpl
+{
+    protected static Configuration configuration;
+    public static final int RF = 2;
+    static
+    {
+        try
+        {
+            HarryHelper.init();
+            configuration = HarryHelper.defaultConfiguration().build();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected static Cluster cluster;
+
+    @BeforeClass
+    public static void beforeClassOverride() throws Throwable
+    {
+        cluster = Cluster.build(2)
+                         .start();
+
+        init(cluster, RF);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        if (cluster != null)
+            cluster.close();
+    }
+
+    public void reset()
+    {
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+        init(cluster, RF);
+    }
+
+    /**
+     * Helped method to generate a {@code number} of sstables for the given {@code schemaSpec}.
+     */
+    @SuppressWarnings("unused")
+    public static void generateTables(SchemaSpec schemaSpec, int number)
+    {
+        Run run = configuration.unbuild()
+                               .setSeed(1)
+                               .setSchemaProvider(new FixedSchemaProviderConfiguration(schemaSpec))
+                               .setClock(() -> new OffsetClock(10000L))
+                               .setDropSchema(false)
+                               .setCreateSchema(false)
+                               .build()
+                               .createRun();
+
+        ColumnFamilyStore store = Keyspace.open(schemaSpec.keyspace).getColumnFamilyStore(schemaSpec.table);
+        store.disableAutoCompaction();
+
+        SSTableGenerator gen = new SSTableGenerator(run, store);
+        SSTableGenerator largePartitionGen = new SSTableWithLargePartition(run, store);
+        for (int i = 0; i < number; i++)
+        {
+            if (i % 3 == 0)
+                largePartitionGen.gen(1_000);
+            else
+                gen.gen(1_000);
+        }
+    }
+
+    /**
+     * Helper class to force generation of a fixed partition size.
+     */
+    private static class SSTableWithLargePartition extends SSTableGenerator
+    {
+        public SSTableWithLargePartition(Run run, ColumnFamilyStore store)
+        {
+            super(run, store);
+        }
+
+        @Override
+        public Collection<SSTableReader> gen(int rows)
+        {
+            long lts = 0;
+            for (int i = 0; i < rows; i++)
+            {
+                long current = lts++;
+                write(current, current, current, current, true).applyUnsafe();
+                if (schema.staticColumns != null)
+                    writeStatic(current, 0, current, current, true).applyUnsafe();
+            }
+            store.forceBlockingFlush();
+            return null;
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/HarryHelper.java b/test/distributed/org/apache/cassandra/distributed/fuzz/HarryHelper.java
new file mode 100644
index 0000000..2997fa6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/HarryHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import harry.core.Configuration;
+import harry.model.OpSelectors;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.PrintlnSut;
+
+public class HarryHelper
+{
+    public static void init()
+    {
+        System.setProperty("log4j2.disableJmx", "true"); // setting both ways as changes between versions
+        System.setProperty("log4j2.disable.jmx", "true");
+        System.setProperty("log4j.shutdownHookEnabled", "false");
+        System.setProperty("cassandra.allow_simplestrategy", "true"); // makes easier to share OSS tests without RF limits
+        System.setProperty("cassandra.minimum_replication_factor", "0"); // makes easier to share OSS tests without RF limits
+
+        System.setProperty("cassandra.disable_tcactive_openssl", "true");
+        System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
+        System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+
+        InJvmSut.init();
+        QueryingNoOpChecker.init();
+        Configuration.registerSubtypes(PrintlnSut.PrintlnSutConfiguration.class);
+        Configuration.registerSubtypes(Configuration.NoOpMetricReporterConfiguration.class);
+        Configuration.registerSubtypes(Configuration.RecentPartitionsValidatorConfiguration.class);
+    }
+    
+    public static Configuration configuration(String... args) throws Exception
+    {
+        File configFile = harry.runner.HarryRunner.loadConfig(args);
+        Configuration configuration = Configuration.fromFile(configFile);
+        System.out.println("Using configuration generated from: " + configFile);
+        return configuration;
+    }
+
+    public static Configuration.ConfigurationBuilder defaultConfiguration() throws Exception
+    {
+        return new Configuration.ConfigurationBuilder()
+               .setClock(() -> new OffsetClock(100000))
+               .setCreateSchema(true)
+               .setTruncateTable(false)
+               .setDropSchema(false)
+               .setSchemaProvider(new Configuration.DefaultSchemaProviderConfiguration())
+               .setClock(new Configuration.ApproximateMonotonicClockConfiguration(7300, 1, TimeUnit.SECONDS))
+               .setClusteringDescriptorSelector(defaultClusteringDescriptorSelectorConfiguration().build())
+               .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(100, 10))
+               .setSUT(new PrintlnSut.PrintlnSutConfiguration())
+               .setDataTracker(new Configuration.DefaultDataTrackerConfiguration())
+               .setRunner((run, configuration) -> {
+                   throw new IllegalArgumentException("Runner is not configured by default.");
+               })
+               .setMetricReporter(new Configuration.NoOpMetricReporterConfiguration());
+    }
+
+    public static Configuration.CDSelectorConfigurationBuilder defaultClusteringDescriptorSelectorConfiguration()
+    {
+        return new Configuration.CDSelectorConfigurationBuilder()
+               .setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(2))
+               .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(2))
+               .setMaxPartitionSize(100)
+               .setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                        .addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
+                                        .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 1)
+                                        .addWeight(OpSelectors.OperationKind.DELETE_RANGE, 1)
+                                        .addWeight(OpSelectors.OperationKind.DELETE_SLICE, 1)
+                                        .addWeight(OpSelectors.OperationKind.DELETE_PARTITION, 1)
+                                        .addWeight(OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS, 1)
+                                        .addWeight(OpSelectors.OperationKind.INSERT_WITH_STATICS, 20)
+                                        .addWeight(OpSelectors.OperationKind.INSERT, 20)
+                                        .addWeight(OpSelectors.OperationKind.UPDATE_WITH_STATICS, 20)
+                                        .addWeight(OpSelectors.OperationKind.UPDATE, 20)
+                                        .build());
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSut.java b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSut.java
new file mode 100644
index 0000000..d62b2c6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSut.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(InJvmSutConfiguration.class);
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(InJvmSut.class);
+
+    public InJvmSut(Cluster cluster)
+    {
+        super(cluster, 10);
+    }
+
+    public InJvmSut(Cluster cluster, int threads)
+    {
+        super(cluster, threads);
+    }
+
+    @JsonTypeName("in_jvm")
+    public static class InJvmSutConfiguration extends InJvmSutBaseConfiguration<IInvokableInstance, Cluster>
+    {
+        @JsonCreator
+        public InJvmSutConfiguration(@JsonProperty(value = "nodes", defaultValue = "3") int nodes,
+                                     @JsonProperty(value = "worker_threads", defaultValue = "10") int worker_threads,
+                                     @JsonProperty("root") String root)
+        {
+            super(nodes, worker_threads, root);
+        }
+
+        protected Cluster cluster(Consumer<IInstanceConfig> cfg, int nodes, File root)
+        {
+            try
+            {
+                return Cluster.build().withConfig(cfg)
+                               .withNodes(nodes)
+                               .withRoot(root)
+                              .createWithoutStarting();
+            }
+            catch (IOException e)
+            {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        protected InJvmSutBase<IInvokableInstance, Cluster> sut(Cluster cluster)
+        {
+            return new InJvmSut(cluster);
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSutBase.java b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSutBase.java
new file mode 100644
index 0000000..4f6da68
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSutBase.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import harry.core.Configuration;
+import harry.model.sut.SystemUnderTest;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class InJvmSutBase<NODE extends IInstance, CLUSTER extends ICluster<NODE>> implements SystemUnderTest.FaultInjectingSut
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(InJvmSutBaseConfiguration.class);
+    }
+
+    private static final Logger logger = LoggerFactory.getLogger(InJvmSutBase.class);
+
+    private final ExecutorService executor;
+    public final CLUSTER cluster;
+    private final AtomicLong cnt = new AtomicLong();
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    public InJvmSutBase(CLUSTER cluster)
+    {
+        this(cluster, 10);
+    }
+
+    public InJvmSutBase(CLUSTER cluster, int threads)
+    {
+        this.cluster = cluster;
+        this.executor = Executors.newFixedThreadPool(threads);
+    }
+
+    public CLUSTER cluster()
+    {
+        return cluster;
+    }
+
+    @Override
+    public boolean isShutdown()
+    {
+        return isShutdown.get();
+    }
+
+    @Override
+    public void shutdown()
+    {
+        assert isShutdown.compareAndSet(false, true);
+
+        try
+        {
+            cluster.close();
+            executor.shutdown();
+            if (!executor.awaitTermination(30, TimeUnit.SECONDS))
+                throw new TimeoutException("Could not terminate cluster within expected timeout");
+        }
+        catch (Throwable e)
+        {
+            logger.error("Could not terminate cluster.", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void schemaChange(String statement)
+    {
+        cluster.schemaChange(statement);
+    }
+
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return execute(statement, cl, (int) (cnt.getAndIncrement() % cluster.size() + 1), bindings);
+    }
+
+    public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, Object... bindings)
+    {
+        if (isShutdown.get())
+            throw new RuntimeException("Instance is shut down");
+
+        try
+        {
+            if (cl == ConsistencyLevel.NODE_LOCAL)
+            {
+                return cluster.get(coordinator)
+                              .executeInternal(statement, bindings);
+            }
+            else if (StringUtils.startsWithIgnoreCase(statement, "SELECT"))
+            {
+                return Iterators.toArray(cluster
+                                         // round-robin
+                                         .coordinator(coordinator)
+                                         .executeWithPaging(statement, toApiCl(cl), 1, bindings),
+                                         Object[].class);
+            }
+            else
+            {
+                return cluster
+                       // round-robin
+                       .coordinator(coordinator)
+                       .execute(statement, toApiCl(cl), bindings);
+            }
+        }
+        catch (Throwable t)
+        {
+            // TODO: find a better way to work around timeouts
+            if (t.getMessage().contains("timed out"))
+                return execute(statement, cl, coordinator, bindings);
+
+            logger.error(String.format("Caught error while trying execute statement %s (%s): %s",
+                                       statement, Arrays.toString(bindings), t.getMessage()),
+                         t);
+            throw t;
+        }
+    }
+
+    // TODO: Ideally, we need to be able to induce a failure of a single specific message
+    public Object[][] executeWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        if (isShutdown.get())
+            throw new RuntimeException("Instance is shut down");
+
+        try
+        {
+            int coordinator = (int) (cnt.getAndIncrement() % cluster.size() + 1);
+            IMessageFilters filters = cluster.filters();
+
+            // Drop exactly one coordinated message
+            int MUTATION_REQ = 0;
+            // TODO: make dropping deterministic
+            filters.verbs(MUTATION_REQ).from(coordinator).messagesMatching(new IMessageFilters.Matcher()
+            {
+                private final AtomicBoolean issued = new AtomicBoolean();
+                public boolean matches(int from, int to, IMessage message)
+                {
+                    if (from != coordinator || message.verb() != MUTATION_REQ)
+                        return false;
+
+                    return !issued.getAndSet(true);
+                }
+            }).drop().on();
+            Object[][] res = cluster
+                             .coordinator(coordinator)
+                             .execute(statement, toApiCl(cl), bindings);
+            filters.reset();
+            return res;
+        }
+        catch (Throwable t)
+        {
+            logger.error(String.format("Caught error while trying execute statement %s", statement),
+                         t);
+            throw t;
+        }
+    }
+
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings), executor);
+    }
+
+    public CompletableFuture<Object[][]> executeAsyncWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return CompletableFuture.supplyAsync(() -> executeWithWriteFailure(statement, cl, bindings), executor);
+    }
+
+    public static abstract class InJvmSutBaseConfiguration<NODE extends IInstance, CLUSTER extends ICluster<NODE>> implements Configuration.SutConfiguration
+    {
+        public final int nodes;
+        public final int worker_threads;
+        public final String root;
+
+        @JsonCreator
+        public InJvmSutBaseConfiguration(@JsonProperty(value = "nodes", defaultValue = "3") int nodes,
+                                         @JsonProperty(value = "worker_threads", defaultValue = "10") int worker_threads,
+                                         @JsonProperty("root") String root)
+        {
+            this.nodes = nodes;
+            this.worker_threads = worker_threads;
+            if (root == null)
+            {
+                try
+                {
+                    this.root = Files.createTempDirectory("cluster_" + nodes + "_nodes").toString();
+                }
+                catch (IOException e)
+                {
+                    throw new IllegalArgumentException(e);
+                }
+            }
+            else
+            {
+                this.root = root;
+            }
+        }
+
+        protected abstract CLUSTER cluster(Consumer<IInstanceConfig> cfg, int nodes, File root);
+        protected abstract InJvmSutBase<NODE, CLUSTER> sut(CLUSTER cluster);
+
+        public SystemUnderTest make()
+        {
+            try
+            {
+                ICluster.setup();
+            }
+            catch (Throwable throwable)
+            {
+                throw new RuntimeException(throwable);
+            }
+
+            CLUSTER cluster;
+
+            cluster = cluster((cfg) -> {
+                                  // TODO: make this configurable
+                                  cfg.with(Feature.NETWORK, Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
+                                     .set("row_cache_size_in_mb", 10L)
+                                     .set("index_summary_capacity_in_mb", 10L)
+                                     .set("counter_cache_size_in_mb", 10L)
+                                     .set("key_cache_size_in_mb", 10L)
+                                     .set("file_cache_size_in_mb", 10)
+                                     .set("memtable_heap_space_in_mb", 128)
+                                     .set("memtable_offheap_space_in_mb", 128)
+                                     .set("memtable_flush_writers", 1)
+                                     .set("concurrent_compactors", 1)
+                                     .set("concurrent_reads", 5)
+                                     .set("concurrent_writes", 5)
+                                     .set("compaction_throughput_mb_per_sec", 10)
+                                     .set("hinted_handoff_enabled", false);
+                              },
+                              nodes,
+                              new File(root));
+
+            cluster.startup();
+            return sut(cluster);
+        }
+    }
+
+    public static org.apache.cassandra.distributed.api.ConsistencyLevel toApiCl(ConsistencyLevel cl)
+    {
+        switch (cl)
+        {
+            case ALL:    return org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+            case QUORUM: return org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+            case NODE_LOCAL: return org.apache.cassandra.distributed.api.ConsistencyLevel.NODE_LOCAL;
+            case ONE: return org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+        }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/QueryingNoOpChecker.java b/test/distributed/org/apache/cassandra/distributed/fuzz/QueryingNoOpChecker.java
new file mode 100644
index 0000000..77d19a2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/QueryingNoOpChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+import harry.operations.Query;
+
+public class QueryingNoOpChecker implements Model
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(QueryingNoOpCheckerConfig.class);
+    }
+
+    private final Run run;
+
+    public QueryingNoOpChecker(Run run)
+    {
+        this.run = run;
+    }
+
+    @Override
+    public void validate(Query query)
+    {
+        CompiledStatement compiled = query.toSelectStatement();
+        run.sut.execute(compiled.cql(),
+                        SystemUnderTest.ConsistencyLevel.ALL,
+                        compiled.bindings());
+    }
+
+    @JsonTypeName("querying_no_op_checker")
+    public static class QueryingNoOpCheckerConfig implements Configuration.ModelConfiguration
+    {
+        @JsonCreator
+        public QueryingNoOpCheckerConfig()
+        {
+        }
+
+        public Model make(Run run)
+        {
+            return new QueryingNoOpChecker(run);
+        }
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableGenerator.java b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableGenerator.java
new file mode 100644
index 0000000..6f2c920
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableGenerator.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.OpSelectors;
+import harry.operations.Relation;
+import harry.operations.Query;
+import harry.operations.QueryGenerator;
+import harry.util.BitSet;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.SingleColumnRelation;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.WhereClause;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.cql3.statements.DeleteStatement;
+import org.apache.cassandra.cql3.statements.StatementType;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static harry.generators.DataGenerators.UNSET_VALUE;
+
+public class SSTableGenerator
+{
+    protected final SchemaSpec schema;
+    protected final OpSelectors.DescriptorSelector descriptorSelector;
+    protected final OpSelectors.MonotonicClock clock;
+    protected final ColumnFamilyStore store;
+    protected final TableMetadata metadata;
+    protected final QueryGenerator rangeSelector;
+
+    private final Set<SSTableReader> sstables = new HashSet<>();
+
+    private long lts = 0;
+
+    public SSTableGenerator(Run run,
+                            ColumnFamilyStore store)
+    {
+        this.schema = run.schemaSpec;
+        this.descriptorSelector = run.descriptorSelector;
+        this.clock = run.clock;
+        this.store = store;
+        // We assume metadata can not change over the lifetime of sstable generator
+        this.metadata = store.metadata.get();
+        this.rangeSelector = new QueryGenerator(schema, run.pdSelector, descriptorSelector, run.rng);
+        store.disableAutoCompaction();
+    }
+
+    public Collection<SSTableReader> gen(int rows)
+    {
+        mark();
+        for (int i = 0; i < rows; i++)
+        {
+            long current = lts++;
+            write(current, current, current, current, true).applyUnsafe();
+            if (schema.staticColumns != null)
+                writeStatic(current, current, current, current, true).applyUnsafe();
+        }
+
+        return flush();
+    }
+
+    public void mark()
+    {
+        sstables.clear();
+        sstables.addAll(store.getLiveSSTables());
+    }
+
+    public Collection<SSTableReader> flush()
+    {
+        store.forceBlockingFlush();
+        sstables.removeAll(store.getLiveSSTables());
+
+        Set<SSTableReader> ret = new HashSet<>(sstables);
+        mark();
+        return ret;
+    }
+
+    public Mutation write(long lts, long pd, long cd, long opId, boolean withRowMarker)
+    {
+        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT, schema);
+
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+        Object[] clusteringKey = schema.inflateClusteringKey(cd);
+        Object[] regularColumns = schema.inflateRegularColumns(vds);
+
+        RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+                                                        FBUtilities.nowInSeconds(),
+                                                        clock.rts(lts),
+                                                        metadata.params.defaultTimeToLive,
+                                                        serializePartitionKey(store, partitionKey))
+                                   .clustering(clusteringKey);
+
+        if (!withRowMarker)
+            builder.noRowMarker();
+
+        for (int i = 0; i < regularColumns.length; i++)
+        {
+            Object value = regularColumns[i];
+            if (value == UNSET_VALUE)
+                continue;
+
+            ColumnMetadata def = metadata.getColumn(new ColumnIdentifier(schema.regularColumns.get(i).name, false));
+            builder.add(def, value);
+        }
+
+        return builder.build();
+    }
+
+    public Mutation writeStatic(long lts, long pd, long cd, long opId, boolean withRowMarker)
+    {
+        long[] sds = descriptorSelector.sds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT_WITH_STATICS, schema);
+
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+        Object[] staticColumns = schema.inflateStaticColumns(sds);
+
+        RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+                                                        FBUtilities.nowInSeconds(),
+                                                        clock.rts(lts),
+                                                        metadata.params.defaultTimeToLive,
+                                                        serializePartitionKey(store, partitionKey));
+
+        if (!withRowMarker)
+            builder.noRowMarker();
+
+        for (int i = 0; i < staticColumns.length; i++)
+        {
+            Object value = staticColumns[i];
+            if (value == UNSET_VALUE)
+                continue;
+            ColumnMetadata def = metadata.getColumn(new ColumnIdentifier(schema.staticColumns.get(i).name, false));
+            builder.add(def, staticColumns[i]);
+        }
+
+        return builder.build();
+    }
+
+    public Mutation deleteColumn(long lts, long pd, long cd, long opId)
+    {
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+        Object[] clusteringKey = schema.inflateClusteringKey(cd);
+
+        RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+                                                        FBUtilities.nowInSeconds(),
+                                                        clock.rts(lts),
+                                                        metadata.params.defaultTimeToLive,
+                                                        serializePartitionKey(store, partitionKey))
+                                   .noRowMarker()
+                                   .clustering(clusteringKey);
+
+        BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN);
+        BitSet mask = schema.regularColumnsMask();
+
+        if (columns == null || columns.allUnset(mask))
+            throw new IllegalArgumentException("Can't have a delete column query with no columns set. Column mask: " + columns);
+
+        columns.eachSetBit((idx) -> {
+            if (idx < schema.regularColumnsOffset)
+                throw new RuntimeException("Can't delete parts of partition or clustering key");
+
+            if (idx > schema.allColumns.size())
+                throw new IndexOutOfBoundsException(String.format("Index %d is out of bounds. Max index: %d", idx, schema.allColumns.size()));
+
+            builder.delete(schema.allColumns.get(idx).name);
+        }, mask);
+
+        return builder.build();
+    }
+
+    public Mutation deleteStatic(long lts, long pd, long opId)
+    {
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+
+        RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+                                                        FBUtilities.nowInSeconds(),
+                                                        clock.rts(lts),
+                                                        metadata.params.defaultTimeToLive,
+                                                        serializePartitionKey(store, partitionKey))
+                                   .noRowMarker();
+
+
+        BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS);
+        BitSet mask = schema.staticColumnsMask();
+
+        if (columns == null || columns.allUnset(mask))
+            throw new IllegalArgumentException("Can't have a delete column query with no columns set. Column mask: " + columns);
+
+        columns.eachSetBit((idx) -> {
+            if (idx < schema.staticColumnsOffset)
+                throw new RuntimeException(String.format("Can't delete parts of partition or clustering key %d (%s)",
+                                                         idx, schema.allColumns.get(idx)));
+
+            if (idx > schema.allColumns.size())
+                throw new IndexOutOfBoundsException(String.format("Index %d is out of bounds. Max index: %d", idx, schema.allColumns.size()));
+
+            builder.delete(schema.allColumns.get(idx).name);
+        }, mask);
+
+        return builder.build();
+    }
+
+    public Mutation deletePartition(long lts, long pd)
+    {
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+
+        PartitionUpdate update = PartitionUpdate.fullPartitionDelete(metadata,
+                                                                     serializePartitionKey(store, partitionKey),
+                                                                     clock.rts(lts),
+                                                                     FBUtilities.nowInSeconds());
+
+        return new Mutation(update);
+    }
+
+    public Mutation deleteRow(long lts, long pd, long cd)
+    {
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+        Object[] clusteringKey = schema.inflateClusteringKey(cd);
+
+        return RowUpdateBuilder.deleteRow(metadata,
+                                          clock.rts(lts),
+                                          serializePartitionKey(store, partitionKey),
+                                          clusteringKey);
+    }
+
+    public Mutation deleteSlice(long lts, long pd, long opId)
+    {
+        return delete(lts, pd, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE));
+    }
+
+    public Mutation deleteRange(long lts, long pd, long opId)
+    {
+        return delete(lts, pd, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE));
+    }
+
+    Mutation delete(long lts, long pd, Query query)
+    {
+        Object[] partitionKey = schema.inflatePartitionKey(pd);
+        WhereClause.Builder builder = new WhereClause.Builder();
+        List<ColumnIdentifier> variableNames = new ArrayList<>();
+
+        List<ByteBuffer> values = new ArrayList<>();
+        for (int i = 0; i < partitionKey.length; i++)
+        {
+            String name = schema.partitionKeys.get(i).name;
+            ColumnMetadata columnDef = metadata.getColumn(ByteBufferUtil.bytes(name));
+            variableNames.add(columnDef.name);
+            values.add(ByteBufferUtil.objectToBytes(partitionKey[i]));
+            builder.add(new SingleColumnRelation(ColumnIdentifier.getInterned(name, true),
+                                                 toOperator(Relation.RelationKind.EQ),
+                                                 new AbstractMarker.Raw(values.size() - 1)));
+        }
+
+        for (Relation relation : query.relations)
+        {
+            String name = relation.column();
+            ColumnMetadata columnDef = metadata.getColumn(ByteBufferUtil.bytes(relation.column()));
+            variableNames.add(columnDef.name);
+            values.add(ByteBufferUtil.objectToBytes(relation.value()));
+            builder.add(new SingleColumnRelation(ColumnIdentifier.getInterned(name, false),
+                                                 toOperator(relation.kind),
+                                                 new AbstractMarker.Raw(values.size() - 1)));
+        }
+
+        StatementRestrictions restrictions = new StatementRestrictions(StatementType.DELETE,
+                                                                       metadata,
+                                                                       builder.build(),
+                                                                       new VariableSpecifications(variableNames),
+                                                                       false,
+                                                                       false,
+                                                                       false,
+                                                                       false);
+
+        QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.QUORUM, values);
+        SortedSet<ClusteringBound<?>> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
+        SortedSet<ClusteringBound<?>> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
+
+        Slices slices = DeleteStatement.toSlices(metadata, startBounds, endBounds);
+        assert slices.size() == 1;
+        int deletionTime = FBUtilities.nowInSeconds();
+        long rts = clock.rts(lts);
+
+        return new RowUpdateBuilder(metadata,
+                                    deletionTime,
+                                    rts,
+                                    metadata.params.defaultTimeToLive,
+                                    serializePartitionKey(store, partitionKey))
+               .noRowMarker()
+               .addRangeTombstone(new RangeTombstone(slices.get(0), new DeletionTime(rts, deletionTime)))
+               .build();
+    }
+
+    public static Operator toOperator(Relation.RelationKind kind)
+    {
+        switch (kind)
+        {
+            case LT: return Operator.LT;
+            case GT: return Operator.GT;
+            case LTE: return Operator.LTE;
+            case GTE: return Operator.GTE;
+            case EQ: return Operator.EQ;
+            default: throw new IllegalArgumentException("Unsupported " + kind);
+        }
+    }
+
+    public static DecoratedKey serializePartitionKey(ColumnFamilyStore store, Object... pk)
+    {
+        if (pk.length == 1)
+            return store.getPartitioner().decorateKey(ByteBufferUtil.objectToBytes(pk[0]));
+
+        ByteBuffer[] values = new ByteBuffer[pk.length];
+        for (int i = 0; i < pk.length; i++)
+            values[i] = ByteBufferUtil.objectToBytes(pk[i]);
+        return store.getPartitioner().decorateKey(CompositeType.build(ByteBufferAccessor.instance, values));
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableLoadingVisitor.java b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableLoadingVisitor.java
new file mode 100644
index 0000000..ce6ef57
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableLoadingVisitor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import harry.core.Run;
+import harry.model.OpSelectors;
+import harry.visitors.VisitExecutor;
+import org.apache.cassandra.db.Keyspace;
+
+public class SSTableLoadingVisitor extends VisitExecutor
+{
+    private final SSTableGenerator gen;
+    private final int forceFlushAfter;
+
+    public SSTableLoadingVisitor(Run run, int forceFlushAfter)
+    {
+        this.forceFlushAfter = forceFlushAfter;
+        gen = new SSTableGenerator(run, Keyspace.open(run.schemaSpec.keyspace).getColumnFamilyStore(run.schemaSpec.table));
+        gen.mark();
+    }
+
+    @Override
+    protected void beforeLts(long l, long l1)
+    {
+
+    }
+
+    protected void afterLts(long lts, long pd) {
+        if (lts > 0 && lts % forceFlushAfter == 0)
+            forceFlush();
+    }
+
+    @Override
+    protected void beforeBatch(long l, long l1, long l2)
+    {
+
+    }
+
+    @Override
+    protected void afterBatch(long l, long l1, long l2)
+    {
+
+    }
+
+    @Override
+    protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind operationKind)
+    {
+        switch (operationKind)
+        {
+            case INSERT:
+                gen.write(lts, pd, cd, opId, true).applyUnsafe();
+                break;
+            case INSERT_WITH_STATICS:
+                gen.write(lts, pd, cd, opId, true).applyUnsafe();
+                gen.writeStatic(lts, pd, cd, opId, true).applyUnsafe();
+                break;
+            case UPDATE:
+                gen.write(lts, pd, cd, opId, false).applyUnsafe();
+                break;
+            case UPDATE_WITH_STATICS:
+                gen.write(lts, pd, cd, opId, false).applyUnsafe();
+                gen.writeStatic(lts, pd, cd, opId, false).applyUnsafe();
+                break;
+            case DELETE_PARTITION:
+                gen.deletePartition(lts, pd).applyUnsafe();
+                break;
+            case DELETE_ROW:
+                gen.deleteRow(lts, pd, cd).applyUnsafe();
+                break;
+            case DELETE_COLUMN:
+                gen.deleteColumn(lts, pd, cd, opId).applyUnsafe();
+                break;
+            case DELETE_COLUMN_WITH_STATICS:
+                gen.deleteColumn(lts, pd, cd, opId).applyUnsafe();
+                gen.deleteStatic(lts, pd, opId).applyUnsafe();
+                break;
+            case DELETE_RANGE:
+                gen.deleteRange(lts, pd, opId).applyUnsafe();
+                break;
+            case DELETE_SLICE:
+                gen.deleteSlice(lts, pd, opId).applyUnsafe();
+                break;
+        }
+    }
+
+    public void forceFlush()
+    {
+        gen.flush();
+    }
+
+
+    @Override
+    public void shutdown() throws InterruptedException
+    {
+
+    }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/test/SSTableGeneratorTest.java b/test/distributed/org/apache/cassandra/distributed/fuzz/test/SSTableGeneratorTest.java
new file mode 100644
index 0000000..5050bc6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/test/SSTableGeneratorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz.test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.collect.Iterators;
+import org.junit.Test;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.ColumnSpec;
+import harry.ddl.SchemaSpec;
+import harry.model.Model;
+import harry.model.QuiescentChecker;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.Query;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.LtsVisitor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.distributed.fuzz.FixedSchemaProviderConfiguration;
+import org.apache.cassandra.distributed.fuzz.HarryHelper;
+import org.apache.cassandra.distributed.fuzz.SSTableLoadingVisitor;
+import org.apache.cassandra.distributed.impl.RowUtil;
+
+public class SSTableGeneratorTest extends CQLTester
+{
+    private static final Configuration configuration;
+
+    static
+    {
+        try
+        {
+            HarryHelper.init();
+            configuration = HarryHelper.defaultConfiguration()
+                                       .setClock(() -> new OffsetClock(10000L))
+                                       .build();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static SchemaSpec schemaSpec = new SchemaSpec(KEYSPACE, "tbl1",
+                                                          Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.asciiType),
+                                                                        ColumnSpec.pk("pk2", ColumnSpec.int64Type)),
+                                                          Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType, false),
+                                                                        ColumnSpec.ck("ck2", ColumnSpec.int64Type, false)),
+                                                          Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.int32Type),
+                                                                        ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
+                                                                        ColumnSpec.regularColumn("v3", ColumnSpec.int32Type),
+                                                                        ColumnSpec.regularColumn("v4", ColumnSpec.asciiType)),
+                                                          Arrays.asList(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType),
+                                                                        ColumnSpec.staticColumn("s2", ColumnSpec.int64Type)));
+
+    @Test
+    public void testSSTableGenerator()
+    {
+        createTable(schemaSpec.compile().cql());
+        Run run = configuration.unbuild()
+                               .setSchemaProvider(new FixedSchemaProviderConfiguration(schemaSpec))
+                               .setSUT(CqlTesterSUT::new)
+                               .build()
+                               .createRun();
+
+
+        SSTableLoadingVisitor sstableVisitor = new SSTableLoadingVisitor(run, 1000);
+        LtsVisitor visitor = new GeneratingVisitor(run, sstableVisitor);
+        Set<Long> pds = new HashSet<>();
+        run.tracker.onLtsStarted((lts) -> pds.add(run.pdSelector.pd(lts, run.schemaSpec)));
+        for (int i = 0; i < 1000; i++)
+            visitor.visit();
+
+        sstableVisitor.forceFlush();
+
+        Model checker = new QuiescentChecker(run);
+        for (Long pd : pds)
+            checker.validate(Query.selectPartition(run.schemaSpec, pd, false));
+    }
+
+    public class CqlTesterSUT implements SystemUnderTest
+    {
+        public boolean isShutdown()
+        {
+            return false;
+        }
+
+        public void shutdown()
+        {
+        }
+
+        public CompletableFuture<Object[][]> executeAsync(String s, ConsistencyLevel consistencyLevel, Object... objects)
+        {
+            throw new RuntimeException("Not implemented");
+        }
+
+        public Object[][] execute(String s, ConsistencyLevel consistencyLevel, Object... objects)
+        {
+            try
+            {
+                return Iterators.toArray(RowUtil.toIter(SSTableGeneratorTest.this.execute(s, objects)),
+                                         Object[].class);
+            }
+            catch (Throwable throwable)
+            {
+                throw new RuntimeException();
+            }
+        }
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org