You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2022/02/16 08:01:28 UTC
[cassandra] branch trunk updated: Intoduce Harry to the tree
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81922c5 Intoduce Harry to the tree
81922c5 is described below
commit 81922c5a7bcbf9db7564a29922c9d8f6222c7cdc
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Tue Oct 12 13:04:11 2021 +0200
Intoduce Harry to the tree
Patch by Alex Petrov; reviewed by Caleb Rackliffe and Abe Ratnofsky for CASSANDRA-16262.
---
.build/build-rat.xml | 1 +
build.xml | 3 +-
.../cql3/statements/ModificationStatement.java | 14 +-
test/conf/harry-generic.yaml | 75 +++++
.../fuzz/FixedSchemaProviderConfiguration.java | 43 +++
.../cassandra/distributed/fuzz/FuzzTestBase.java | 131 ++++++++
.../cassandra/distributed/fuzz/HarryHelper.java | 96 ++++++
.../cassandra/distributed/fuzz/InJvmSut.java | 86 +++++
.../cassandra/distributed/fuzz/InJvmSutBase.java | 294 +++++++++++++++++
.../distributed/fuzz/QueryingNoOpChecker.java | 66 ++++
.../distributed/fuzz/SSTableGenerator.java | 360 +++++++++++++++++++++
.../distributed/fuzz/SSTableLoadingVisitor.java | 113 +++++++
.../fuzz/test/SSTableGeneratorTest.java | 131 ++++++++
13 files changed, 1410 insertions(+), 3 deletions(-)
diff --git a/.build/build-rat.xml b/.build/build-rat.xml
index 599d5ea..c988043 100644
--- a/.build/build-rat.xml
+++ b/.build/build-rat.xml
@@ -53,6 +53,7 @@
<exclude name="**/cassandra.yaml"/>
<exclude name="**/cassandra-murmur.yaml"/>
<exclude name="**/cassandra-seeds.yaml"/>
+ <exclude name="**/harry-generic.yaml"/>
<exclude NAME="**/doc/antora.yml"/>
<exclude name="**/test/conf/cassandra.yaml"/>
<exclude name="**/test/conf/cassandra-old.yaml"/>
diff --git a/build.xml b/build.xml
index 07c4057..01e2061 100644
--- a/build.xml
+++ b/build.xml
@@ -539,6 +539,7 @@
<exclusion groupId="com.google.guava" artifactId="guava"/>
</dependency>
<dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.12" scope="test"/>
+ <dependency groupId="org.apache.cassandra" artifactId="harry-core" version="0.0.1" scope="test"/>
<dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" scope="test"/>
<dependency groupId="com.puppycrawl.tools" artifactId="checkstyle" version="8.40" scope="test"/>
<dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided">
@@ -742,7 +743,7 @@
<dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess" scope="test"/>
<dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" scope="test"/>
<dependency groupId="org.apache.ant" artifactId="ant-junit" scope="test"/>
-
+ <dependency groupId="org.apache.cassandra" artifactId="harry-core"/>
<!-- adding this dependency is necessary for assertj. When updating assertj, need to also update the version of
this that the new assertj's `assertj-parent-pom` depends on. -->
<dependency groupId="org.junit" artifactId="junit-bom" type="pom"/>
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 4233d23..b6d274a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -855,9 +855,19 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
private Slices toSlices(SortedSet<ClusteringBound<?>> startBounds, SortedSet<ClusteringBound<?>> endBounds)
{
+ return toSlices(metadata, startBounds, endBounds);
+ }
+
+ public static Slices toSlices(TableMetadata metadata, SortedSet<ClusteringBound<?>> startBounds, SortedSet<ClusteringBound<?>> endBounds)
+ {
+ return toSlices(metadata.comparator, startBounds, endBounds);
+ }
+
+ public static Slices toSlices(ClusteringComparator comparator, SortedSet<ClusteringBound<?>> startBounds, SortedSet<ClusteringBound<?>> endBounds)
+ {
assert startBounds.size() == endBounds.size();
- Slices.Builder builder = new Slices.Builder(metadata().comparator);
+ Slices.Builder builder = new Slices.Builder(comparator);
Iterator<ClusteringBound<?>> starts = startBounds.iterator();
Iterator<ClusteringBound<?>> ends = endBounds.iterator();
@@ -865,7 +875,7 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa
while (starts.hasNext())
{
Slice slice = Slice.make(starts.next(), ends.next());
- if (!slice.isEmpty(metadata().comparator))
+ if (!slice.isEmpty(comparator))
{
builder.add(slice);
}
diff --git a/test/conf/harry-generic.yaml b/test/conf/harry-generic.yaml
new file mode 100644
index 0000000..e4bfbab
--- /dev/null
+++ b/test/conf/harry-generic.yaml
@@ -0,0 +1,75 @@
+seed: 1
+
+# Default schema provider generates random schema
+schema_provider:
+ default: {}
+
+drop_schema: false
+create_schema: true
+truncate_table: false
+
+clock:
+ approximate_monotonic:
+ history_size: 7300
+ epoch_length: 1
+ epoch_time_unit: "SECONDS"
+
+system_under_test:
+ println: {}
+
+partition_descriptor_selector:
+ default:
+ window_size: 100
+ slide_after_repeats: 10
+
+clustering_descriptor_selector:
+ default:
+ modifications_per_lts:
+ type: "constant"
+ constant: 2
+ rows_per_modification:
+ type: "constant"
+ constant: 2
+ operation_kind_weights:
+ DELETE_RANGE: 1
+ DELETE_SLICE: 1
+ DELETE_ROW: 1
+ DELETE_COLUMN: 1
+ DELETE_PARTITION: 1
+ INSERT: 50
+ UPDATE: 50
+ DELETE_COLUMN_WITH_STATICS: 1
+ INSERT_WITH_STATICS: 1
+ UPDATE_WITH_STATICS: 1
+ column_mask_bitsets: null
+ max_partition_size: 100
+
+data_tracker:
+ default:
+ max_seen_lts: -1
+ max_complete_lts: -1
+
+runner:
+ sequential:
+ run_time: 60
+ run_time_unit: "MINUTES"
+ visitors:
+ - logging:
+ row_visitor:
+ mutating: {}
+ - sampler:
+ trigger_after: 100000
+ sample_partitions: 10
+ - validate_recent_partitions:
+ partition_count: 5
+ trigger_after: 10000
+ model:
+ querying_no_op_checker: {}
+ - validate_all_partitions:
+ concurrency: 5
+ trigger_after: 10000
+ model:
+ querying_no_op_checker: {}
+
+metric_reporter:
+ no_op: {}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/FixedSchemaProviderConfiguration.java b/test/distributed/org/apache/cassandra/distributed/fuzz/FixedSchemaProviderConfiguration.java
new file mode 100644
index 0000000..2e03903
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/FixedSchemaProviderConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.ddl.SchemaSpec;
+import harry.model.sut.SystemUnderTest;
+
+@JsonTypeName("fixed")
+public class FixedSchemaProviderConfiguration implements Configuration.SchemaProviderConfiguration
+{
+ private final SchemaSpec schemaSpec;
+
+ @JsonCreator
+ public FixedSchemaProviderConfiguration(SchemaSpec schemaSpec)
+ {
+ this.schemaSpec = schemaSpec;
+ }
+
+ @Override
+ public SchemaSpec make(long l, SystemUnderTest systemUnderTest)
+ {
+ return this.schemaSpec;
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/FuzzTestBase.java b/test/distributed/org/apache/cassandra/distributed/fuzz/FuzzTestBase.java
new file mode 100644
index 0000000..f7327da
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/FuzzTestBase.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.util.Collection;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.clock.OffsetClock;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public abstract class FuzzTestBase extends TestBaseImpl
+{
+ protected static Configuration configuration;
+ public static final int RF = 2;
+ static
+ {
+ try
+ {
+ HarryHelper.init();
+ configuration = HarryHelper.defaultConfiguration().build();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected static Cluster cluster;
+
+ @BeforeClass
+ public static void beforeClassOverride() throws Throwable
+ {
+ cluster = Cluster.build(2)
+ .start();
+
+ init(cluster, RF);
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ public void reset()
+ {
+ cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
+ init(cluster, RF);
+ }
+
+ /**
+ * Helped method to generate a {@code number} of sstables for the given {@code schemaSpec}.
+ */
+ @SuppressWarnings("unused")
+ public static void generateTables(SchemaSpec schemaSpec, int number)
+ {
+ Run run = configuration.unbuild()
+ .setSeed(1)
+ .setSchemaProvider(new FixedSchemaProviderConfiguration(schemaSpec))
+ .setClock(() -> new OffsetClock(10000L))
+ .setDropSchema(false)
+ .setCreateSchema(false)
+ .build()
+ .createRun();
+
+ ColumnFamilyStore store = Keyspace.open(schemaSpec.keyspace).getColumnFamilyStore(schemaSpec.table);
+ store.disableAutoCompaction();
+
+ SSTableGenerator gen = new SSTableGenerator(run, store);
+ SSTableGenerator largePartitionGen = new SSTableWithLargePartition(run, store);
+ for (int i = 0; i < number; i++)
+ {
+ if (i % 3 == 0)
+ largePartitionGen.gen(1_000);
+ else
+ gen.gen(1_000);
+ }
+ }
+
+ /**
+ * Helper class to force generation of a fixed partition size.
+ */
+ private static class SSTableWithLargePartition extends SSTableGenerator
+ {
+ public SSTableWithLargePartition(Run run, ColumnFamilyStore store)
+ {
+ super(run, store);
+ }
+
+ @Override
+ public Collection<SSTableReader> gen(int rows)
+ {
+ long lts = 0;
+ for (int i = 0; i < rows; i++)
+ {
+ long current = lts++;
+ write(current, current, current, current, true).applyUnsafe();
+ if (schema.staticColumns != null)
+ writeStatic(current, 0, current, current, true).applyUnsafe();
+ }
+ store.forceBlockingFlush();
+ return null;
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/HarryHelper.java b/test/distributed/org/apache/cassandra/distributed/fuzz/HarryHelper.java
new file mode 100644
index 0000000..2997fa6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/HarryHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import harry.core.Configuration;
+import harry.model.OpSelectors;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.PrintlnSut;
+
+public class HarryHelper
+{
+ public static void init()
+ {
+ System.setProperty("log4j2.disableJmx", "true"); // setting both ways as changes between versions
+ System.setProperty("log4j2.disable.jmx", "true");
+ System.setProperty("log4j.shutdownHookEnabled", "false");
+ System.setProperty("cassandra.allow_simplestrategy", "true"); // makes easier to share OSS tests without RF limits
+ System.setProperty("cassandra.minimum_replication_factor", "0"); // makes easier to share OSS tests without RF limits
+
+ System.setProperty("cassandra.disable_tcactive_openssl", "true");
+ System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
+ System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
+
+ InJvmSut.init();
+ QueryingNoOpChecker.init();
+ Configuration.registerSubtypes(PrintlnSut.PrintlnSutConfiguration.class);
+ Configuration.registerSubtypes(Configuration.NoOpMetricReporterConfiguration.class);
+ Configuration.registerSubtypes(Configuration.RecentPartitionsValidatorConfiguration.class);
+ }
+
+ public static Configuration configuration(String... args) throws Exception
+ {
+ File configFile = harry.runner.HarryRunner.loadConfig(args);
+ Configuration configuration = Configuration.fromFile(configFile);
+ System.out.println("Using configuration generated from: " + configFile);
+ return configuration;
+ }
+
+ public static Configuration.ConfigurationBuilder defaultConfiguration() throws Exception
+ {
+ return new Configuration.ConfigurationBuilder()
+ .setClock(() -> new OffsetClock(100000))
+ .setCreateSchema(true)
+ .setTruncateTable(false)
+ .setDropSchema(false)
+ .setSchemaProvider(new Configuration.DefaultSchemaProviderConfiguration())
+ .setClock(new Configuration.ApproximateMonotonicClockConfiguration(7300, 1, TimeUnit.SECONDS))
+ .setClusteringDescriptorSelector(defaultClusteringDescriptorSelectorConfiguration().build())
+ .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(100, 10))
+ .setSUT(new PrintlnSut.PrintlnSutConfiguration())
+ .setDataTracker(new Configuration.DefaultDataTrackerConfiguration())
+ .setRunner((run, configuration) -> {
+ throw new IllegalArgumentException("Runner is not configured by default.");
+ })
+ .setMetricReporter(new Configuration.NoOpMetricReporterConfiguration());
+ }
+
+ public static Configuration.CDSelectorConfigurationBuilder defaultClusteringDescriptorSelectorConfiguration()
+ {
+ return new Configuration.CDSelectorConfigurationBuilder()
+ .setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(2))
+ .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(2))
+ .setMaxPartitionSize(100)
+ .setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+ .addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_RANGE, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_SLICE, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_PARTITION, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS, 1)
+ .addWeight(OpSelectors.OperationKind.INSERT_WITH_STATICS, 20)
+ .addWeight(OpSelectors.OperationKind.INSERT, 20)
+ .addWeight(OpSelectors.OperationKind.UPDATE_WITH_STATICS, 20)
+ .addWeight(OpSelectors.OperationKind.UPDATE, 20)
+ .build());
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSut.java b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSut.java
new file mode 100644
index 0000000..d62b2c6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSut.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+
+public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
+{
+ public static void init()
+ {
+ Configuration.registerSubtypes(InJvmSutConfiguration.class);
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(InJvmSut.class);
+
+ public InJvmSut(Cluster cluster)
+ {
+ super(cluster, 10);
+ }
+
+ public InJvmSut(Cluster cluster, int threads)
+ {
+ super(cluster, threads);
+ }
+
+ @JsonTypeName("in_jvm")
+ public static class InJvmSutConfiguration extends InJvmSutBaseConfiguration<IInvokableInstance, Cluster>
+ {
+ @JsonCreator
+ public InJvmSutConfiguration(@JsonProperty(value = "nodes", defaultValue = "3") int nodes,
+ @JsonProperty(value = "worker_threads", defaultValue = "10") int worker_threads,
+ @JsonProperty("root") String root)
+ {
+ super(nodes, worker_threads, root);
+ }
+
+ protected Cluster cluster(Consumer<IInstanceConfig> cfg, int nodes, File root)
+ {
+ try
+ {
+ return Cluster.build().withConfig(cfg)
+ .withNodes(nodes)
+ .withRoot(root)
+ .createWithoutStarting();
+ }
+ catch (IOException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ protected InJvmSutBase<IInvokableInstance, Cluster> sut(Cluster cluster)
+ {
+ return new InJvmSut(cluster);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSutBase.java b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSutBase.java
new file mode 100644
index 0000000..4f6da68
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/InJvmSutBase.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+import com.google.common.collect.Iterators;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import harry.core.Configuration;
+import harry.model.sut.SystemUnderTest;
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class InJvmSutBase<NODE extends IInstance, CLUSTER extends ICluster<NODE>> implements SystemUnderTest.FaultInjectingSut
+{
+ public static void init()
+ {
+ Configuration.registerSubtypes(InJvmSutBaseConfiguration.class);
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(InJvmSutBase.class);
+
+ private final ExecutorService executor;
+ public final CLUSTER cluster;
+ private final AtomicLong cnt = new AtomicLong();
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ public InJvmSutBase(CLUSTER cluster)
+ {
+ this(cluster, 10);
+ }
+
+ public InJvmSutBase(CLUSTER cluster, int threads)
+ {
+ this.cluster = cluster;
+ this.executor = Executors.newFixedThreadPool(threads);
+ }
+
+ public CLUSTER cluster()
+ {
+ return cluster;
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return isShutdown.get();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ assert isShutdown.compareAndSet(false, true);
+
+ try
+ {
+ cluster.close();
+ executor.shutdown();
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS))
+ throw new TimeoutException("Could not terminate cluster within expected timeout");
+ }
+ catch (Throwable e)
+ {
+ logger.error("Could not terminate cluster.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void schemaChange(String statement)
+ {
+ cluster.schemaChange(statement);
+ }
+
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return execute(statement, cl, (int) (cnt.getAndIncrement() % cluster.size() + 1), bindings);
+ }
+
+ public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, Object... bindings)
+ {
+ if (isShutdown.get())
+ throw new RuntimeException("Instance is shut down");
+
+ try
+ {
+ if (cl == ConsistencyLevel.NODE_LOCAL)
+ {
+ return cluster.get(coordinator)
+ .executeInternal(statement, bindings);
+ }
+ else if (StringUtils.startsWithIgnoreCase(statement, "SELECT"))
+ {
+ return Iterators.toArray(cluster
+ // round-robin
+ .coordinator(coordinator)
+ .executeWithPaging(statement, toApiCl(cl), 1, bindings),
+ Object[].class);
+ }
+ else
+ {
+ return cluster
+ // round-robin
+ .coordinator(coordinator)
+ .execute(statement, toApiCl(cl), bindings);
+ }
+ }
+ catch (Throwable t)
+ {
+ // TODO: find a better way to work around timeouts
+ if (t.getMessage().contains("timed out"))
+ return execute(statement, cl, coordinator, bindings);
+
+ logger.error(String.format("Caught error while trying execute statement %s (%s): %s",
+ statement, Arrays.toString(bindings), t.getMessage()),
+ t);
+ throw t;
+ }
+ }
+
+ // TODO: Ideally, we need to be able to induce a failure of a single specific message
+ public Object[][] executeWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ if (isShutdown.get())
+ throw new RuntimeException("Instance is shut down");
+
+ try
+ {
+ int coordinator = (int) (cnt.getAndIncrement() % cluster.size() + 1);
+ IMessageFilters filters = cluster.filters();
+
+ // Drop exactly one coordinated message
+ int MUTATION_REQ = 0;
+ // TODO: make dropping deterministic
+ filters.verbs(MUTATION_REQ).from(coordinator).messagesMatching(new IMessageFilters.Matcher()
+ {
+ private final AtomicBoolean issued = new AtomicBoolean();
+ public boolean matches(int from, int to, IMessage message)
+ {
+ if (from != coordinator || message.verb() != MUTATION_REQ)
+ return false;
+
+ return !issued.getAndSet(true);
+ }
+ }).drop().on();
+ Object[][] res = cluster
+ .coordinator(coordinator)
+ .execute(statement, toApiCl(cl), bindings);
+ filters.reset();
+ return res;
+ }
+ catch (Throwable t)
+ {
+ logger.error(String.format("Caught error while trying execute statement %s", statement),
+ t);
+ throw t;
+ }
+ }
+
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings), executor);
+ }
+
+ public CompletableFuture<Object[][]> executeAsyncWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return CompletableFuture.supplyAsync(() -> executeWithWriteFailure(statement, cl, bindings), executor);
+ }
+
+ public static abstract class InJvmSutBaseConfiguration<NODE extends IInstance, CLUSTER extends ICluster<NODE>> implements Configuration.SutConfiguration
+ {
+ public final int nodes;
+ public final int worker_threads;
+ public final String root;
+
+ @JsonCreator
+ public InJvmSutBaseConfiguration(@JsonProperty(value = "nodes", defaultValue = "3") int nodes,
+ @JsonProperty(value = "worker_threads", defaultValue = "10") int worker_threads,
+ @JsonProperty("root") String root)
+ {
+ this.nodes = nodes;
+ this.worker_threads = worker_threads;
+ if (root == null)
+ {
+ try
+ {
+ this.root = Files.createTempDirectory("cluster_" + nodes + "_nodes").toString();
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ else
+ {
+ this.root = root;
+ }
+ }
+
+ protected abstract CLUSTER cluster(Consumer<IInstanceConfig> cfg, int nodes, File root);
+ protected abstract InJvmSutBase<NODE, CLUSTER> sut(CLUSTER cluster);
+
+ public SystemUnderTest make()
+ {
+ try
+ {
+ ICluster.setup();
+ }
+ catch (Throwable throwable)
+ {
+ throw new RuntimeException(throwable);
+ }
+
+ CLUSTER cluster;
+
+ cluster = cluster((cfg) -> {
+ // TODO: make this configurable
+ cfg.with(Feature.NETWORK, Feature.GOSSIP, Feature.NATIVE_PROTOCOL)
+ .set("row_cache_size_in_mb", 10L)
+ .set("index_summary_capacity_in_mb", 10L)
+ .set("counter_cache_size_in_mb", 10L)
+ .set("key_cache_size_in_mb", 10L)
+ .set("file_cache_size_in_mb", 10)
+ .set("memtable_heap_space_in_mb", 128)
+ .set("memtable_offheap_space_in_mb", 128)
+ .set("memtable_flush_writers", 1)
+ .set("concurrent_compactors", 1)
+ .set("concurrent_reads", 5)
+ .set("concurrent_writes", 5)
+ .set("compaction_throughput_mb_per_sec", 10)
+ .set("hinted_handoff_enabled", false);
+ },
+ nodes,
+ new File(root));
+
+ cluster.startup();
+ return sut(cluster);
+ }
+ }
+
+ public static org.apache.cassandra.distributed.api.ConsistencyLevel toApiCl(ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ALL: return org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+ case QUORUM: return org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+ case NODE_LOCAL: return org.apache.cassandra.distributed.api.ConsistencyLevel.NODE_LOCAL;
+ case ONE: return org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
+ }
+ throw new IllegalArgumentException("Don't know a CL: " + cl);
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/QueryingNoOpChecker.java b/test/distributed/org/apache/cassandra/distributed/fuzz/QueryingNoOpChecker.java
new file mode 100644
index 0000000..77d19a2
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/QueryingNoOpChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+import harry.operations.Query;
+
+public class QueryingNoOpChecker implements Model
+{
+ public static void init()
+ {
+ Configuration.registerSubtypes(QueryingNoOpCheckerConfig.class);
+ }
+
+ private final Run run;
+
+ public QueryingNoOpChecker(Run run)
+ {
+ this.run = run;
+ }
+
+ @Override
+ public void validate(Query query)
+ {
+ CompiledStatement compiled = query.toSelectStatement();
+ run.sut.execute(compiled.cql(),
+ SystemUnderTest.ConsistencyLevel.ALL,
+ compiled.bindings());
+ }
+
+ @JsonTypeName("querying_no_op_checker")
+ public static class QueryingNoOpCheckerConfig implements Configuration.ModelConfiguration
+ {
+ @JsonCreator
+ public QueryingNoOpCheckerConfig()
+ {
+ }
+
+ public Model make(Run run)
+ {
+ return new QueryingNoOpChecker(run);
+ }
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableGenerator.java b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableGenerator.java
new file mode 100644
index 0000000..6f2c920
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableGenerator.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.OpSelectors;
+import harry.operations.Relation;
+import harry.operations.Query;
+import harry.operations.QueryGenerator;
+import harry.util.BitSet;
+import org.apache.cassandra.cql3.AbstractMarker;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.SingleColumnRelation;
+import org.apache.cassandra.cql3.VariableSpecifications;
+import org.apache.cassandra.cql3.WhereClause;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.statements.Bound;
+import org.apache.cassandra.cql3.statements.DeleteStatement;
+import org.apache.cassandra.cql3.statements.StatementType;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static harry.generators.DataGenerators.UNSET_VALUE;
+
+public class SSTableGenerator
+{
+ protected final SchemaSpec schema;
+ protected final OpSelectors.DescriptorSelector descriptorSelector;
+ protected final OpSelectors.MonotonicClock clock;
+ protected final ColumnFamilyStore store;
+ protected final TableMetadata metadata;
+ protected final QueryGenerator rangeSelector;
+
+ private final Set<SSTableReader> sstables = new HashSet<>();
+
+ private long lts = 0;
+
+ public SSTableGenerator(Run run,
+ ColumnFamilyStore store)
+ {
+ this.schema = run.schemaSpec;
+ this.descriptorSelector = run.descriptorSelector;
+ this.clock = run.clock;
+ this.store = store;
+ // We assume metadata can not change over the lifetime of sstable generator
+ this.metadata = store.metadata.get();
+ this.rangeSelector = new QueryGenerator(schema, run.pdSelector, descriptorSelector, run.rng);
+ store.disableAutoCompaction();
+ }
+
+ public Collection<SSTableReader> gen(int rows)
+ {
+ mark();
+ for (int i = 0; i < rows; i++)
+ {
+ long current = lts++;
+ write(current, current, current, current, true).applyUnsafe();
+ if (schema.staticColumns != null)
+ writeStatic(current, current, current, current, true).applyUnsafe();
+ }
+
+ return flush();
+ }
+
+ public void mark()
+ {
+ sstables.clear();
+ sstables.addAll(store.getLiveSSTables());
+ }
+
+ public Collection<SSTableReader> flush()
+ {
+ store.forceBlockingFlush();
+ sstables.removeAll(store.getLiveSSTables());
+
+ Set<SSTableReader> ret = new HashSet<>(sstables);
+ mark();
+ return ret;
+ }
+
+ public Mutation write(long lts, long pd, long cd, long opId, boolean withRowMarker)
+ {
+ long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT, schema);
+
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+ Object[] clusteringKey = schema.inflateClusteringKey(cd);
+ Object[] regularColumns = schema.inflateRegularColumns(vds);
+
+ RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+ FBUtilities.nowInSeconds(),
+ clock.rts(lts),
+ metadata.params.defaultTimeToLive,
+ serializePartitionKey(store, partitionKey))
+ .clustering(clusteringKey);
+
+ if (!withRowMarker)
+ builder.noRowMarker();
+
+ for (int i = 0; i < regularColumns.length; i++)
+ {
+ Object value = regularColumns[i];
+ if (value == UNSET_VALUE)
+ continue;
+
+ ColumnMetadata def = metadata.getColumn(new ColumnIdentifier(schema.regularColumns.get(i).name, false));
+ builder.add(def, value);
+ }
+
+ return builder.build();
+ }
+
+ public Mutation writeStatic(long lts, long pd, long cd, long opId, boolean withRowMarker)
+ {
+ long[] sds = descriptorSelector.sds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT_WITH_STATICS, schema);
+
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+ Object[] staticColumns = schema.inflateStaticColumns(sds);
+
+ RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+ FBUtilities.nowInSeconds(),
+ clock.rts(lts),
+ metadata.params.defaultTimeToLive,
+ serializePartitionKey(store, partitionKey));
+
+ if (!withRowMarker)
+ builder.noRowMarker();
+
+ for (int i = 0; i < staticColumns.length; i++)
+ {
+ Object value = staticColumns[i];
+ if (value == UNSET_VALUE)
+ continue;
+ ColumnMetadata def = metadata.getColumn(new ColumnIdentifier(schema.staticColumns.get(i).name, false));
+ builder.add(def, staticColumns[i]);
+ }
+
+ return builder.build();
+ }
+
+ public Mutation deleteColumn(long lts, long pd, long cd, long opId)
+ {
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+ Object[] clusteringKey = schema.inflateClusteringKey(cd);
+
+ RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+ FBUtilities.nowInSeconds(),
+ clock.rts(lts),
+ metadata.params.defaultTimeToLive,
+ serializePartitionKey(store, partitionKey))
+ .noRowMarker()
+ .clustering(clusteringKey);
+
+ BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN);
+ BitSet mask = schema.regularColumnsMask();
+
+ if (columns == null || columns.allUnset(mask))
+ throw new IllegalArgumentException("Can't have a delete column query with no columns set. Column mask: " + columns);
+
+ columns.eachSetBit((idx) -> {
+ if (idx < schema.regularColumnsOffset)
+ throw new RuntimeException("Can't delete parts of partition or clustering key");
+
+ if (idx > schema.allColumns.size())
+ throw new IndexOutOfBoundsException(String.format("Index %d is out of bounds. Max index: %d", idx, schema.allColumns.size()));
+
+ builder.delete(schema.allColumns.get(idx).name);
+ }, mask);
+
+ return builder.build();
+ }
+
+ public Mutation deleteStatic(long lts, long pd, long opId)
+ {
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+
+ RowUpdateBuilder builder = new RowUpdateBuilder(metadata,
+ FBUtilities.nowInSeconds(),
+ clock.rts(lts),
+ metadata.params.defaultTimeToLive,
+ serializePartitionKey(store, partitionKey))
+ .noRowMarker();
+
+
+ BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS);
+ BitSet mask = schema.staticColumnsMask();
+
+ if (columns == null || columns.allUnset(mask))
+ throw new IllegalArgumentException("Can't have a delete column query with no columns set. Column mask: " + columns);
+
+ columns.eachSetBit((idx) -> {
+ if (idx < schema.staticColumnsOffset)
+ throw new RuntimeException(String.format("Can't delete parts of partition or clustering key %d (%s)",
+ idx, schema.allColumns.get(idx)));
+
+ if (idx > schema.allColumns.size())
+ throw new IndexOutOfBoundsException(String.format("Index %d is out of bounds. Max index: %d", idx, schema.allColumns.size()));
+
+ builder.delete(schema.allColumns.get(idx).name);
+ }, mask);
+
+ return builder.build();
+ }
+
+ public Mutation deletePartition(long lts, long pd)
+ {
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+
+ PartitionUpdate update = PartitionUpdate.fullPartitionDelete(metadata,
+ serializePartitionKey(store, partitionKey),
+ clock.rts(lts),
+ FBUtilities.nowInSeconds());
+
+ return new Mutation(update);
+ }
+
+ public Mutation deleteRow(long lts, long pd, long cd)
+ {
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+ Object[] clusteringKey = schema.inflateClusteringKey(cd);
+
+ return RowUpdateBuilder.deleteRow(metadata,
+ clock.rts(lts),
+ serializePartitionKey(store, partitionKey),
+ clusteringKey);
+ }
+
+ public Mutation deleteSlice(long lts, long pd, long opId)
+ {
+ return delete(lts, pd, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE));
+ }
+
+ public Mutation deleteRange(long lts, long pd, long opId)
+ {
+ return delete(lts, pd, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE));
+ }
+
+ Mutation delete(long lts, long pd, Query query)
+ {
+ Object[] partitionKey = schema.inflatePartitionKey(pd);
+ WhereClause.Builder builder = new WhereClause.Builder();
+ List<ColumnIdentifier> variableNames = new ArrayList<>();
+
+ List<ByteBuffer> values = new ArrayList<>();
+ for (int i = 0; i < partitionKey.length; i++)
+ {
+ String name = schema.partitionKeys.get(i).name;
+ ColumnMetadata columnDef = metadata.getColumn(ByteBufferUtil.bytes(name));
+ variableNames.add(columnDef.name);
+ values.add(ByteBufferUtil.objectToBytes(partitionKey[i]));
+ builder.add(new SingleColumnRelation(ColumnIdentifier.getInterned(name, true),
+ toOperator(Relation.RelationKind.EQ),
+ new AbstractMarker.Raw(values.size() - 1)));
+ }
+
+ for (Relation relation : query.relations)
+ {
+ String name = relation.column();
+ ColumnMetadata columnDef = metadata.getColumn(ByteBufferUtil.bytes(relation.column()));
+ variableNames.add(columnDef.name);
+ values.add(ByteBufferUtil.objectToBytes(relation.value()));
+ builder.add(new SingleColumnRelation(ColumnIdentifier.getInterned(name, false),
+ toOperator(relation.kind),
+ new AbstractMarker.Raw(values.size() - 1)));
+ }
+
+ StatementRestrictions restrictions = new StatementRestrictions(StatementType.DELETE,
+ metadata,
+ builder.build(),
+ new VariableSpecifications(variableNames),
+ false,
+ false,
+ false,
+ false);
+
+ QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.QUORUM, values);
+ SortedSet<ClusteringBound<?>> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options);
+ SortedSet<ClusteringBound<?>> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options);
+
+ Slices slices = DeleteStatement.toSlices(metadata, startBounds, endBounds);
+ assert slices.size() == 1;
+ int deletionTime = FBUtilities.nowInSeconds();
+ long rts = clock.rts(lts);
+
+ return new RowUpdateBuilder(metadata,
+ deletionTime,
+ rts,
+ metadata.params.defaultTimeToLive,
+ serializePartitionKey(store, partitionKey))
+ .noRowMarker()
+ .addRangeTombstone(new RangeTombstone(slices.get(0), new DeletionTime(rts, deletionTime)))
+ .build();
+ }
+
+ public static Operator toOperator(Relation.RelationKind kind)
+ {
+ switch (kind)
+ {
+ case LT: return Operator.LT;
+ case GT: return Operator.GT;
+ case LTE: return Operator.LTE;
+ case GTE: return Operator.GTE;
+ case EQ: return Operator.EQ;
+ default: throw new IllegalArgumentException("Unsupported " + kind);
+ }
+ }
+
+ public static DecoratedKey serializePartitionKey(ColumnFamilyStore store, Object... pk)
+ {
+ if (pk.length == 1)
+ return store.getPartitioner().decorateKey(ByteBufferUtil.objectToBytes(pk[0]));
+
+ ByteBuffer[] values = new ByteBuffer[pk.length];
+ for (int i = 0; i < pk.length; i++)
+ values[i] = ByteBufferUtil.objectToBytes(pk[i]);
+ return store.getPartitioner().decorateKey(CompositeType.build(ByteBufferAccessor.instance, values));
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableLoadingVisitor.java b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableLoadingVisitor.java
new file mode 100644
index 0000000..ce6ef57
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/SSTableLoadingVisitor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz;
+
+import harry.core.Run;
+import harry.model.OpSelectors;
+import harry.visitors.VisitExecutor;
+import org.apache.cassandra.db.Keyspace;
+
+public class SSTableLoadingVisitor extends VisitExecutor
+{
+ private final SSTableGenerator gen;
+ private final int forceFlushAfter;
+
+ public SSTableLoadingVisitor(Run run, int forceFlushAfter)
+ {
+ this.forceFlushAfter = forceFlushAfter;
+ gen = new SSTableGenerator(run, Keyspace.open(run.schemaSpec.keyspace).getColumnFamilyStore(run.schemaSpec.table));
+ gen.mark();
+ }
+
+ @Override
+ protected void beforeLts(long l, long l1)
+ {
+
+ }
+
+ protected void afterLts(long lts, long pd) {
+ if (lts > 0 && lts % forceFlushAfter == 0)
+ forceFlush();
+ }
+
+ @Override
+ protected void beforeBatch(long l, long l1, long l2)
+ {
+
+ }
+
+ @Override
+ protected void afterBatch(long l, long l1, long l2)
+ {
+
+ }
+
+ @Override
+ protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind operationKind)
+ {
+ switch (operationKind)
+ {
+ case INSERT:
+ gen.write(lts, pd, cd, opId, true).applyUnsafe();
+ break;
+ case INSERT_WITH_STATICS:
+ gen.write(lts, pd, cd, opId, true).applyUnsafe();
+ gen.writeStatic(lts, pd, cd, opId, true).applyUnsafe();
+ break;
+ case UPDATE:
+ gen.write(lts, pd, cd, opId, false).applyUnsafe();
+ break;
+ case UPDATE_WITH_STATICS:
+ gen.write(lts, pd, cd, opId, false).applyUnsafe();
+ gen.writeStatic(lts, pd, cd, opId, false).applyUnsafe();
+ break;
+ case DELETE_PARTITION:
+ gen.deletePartition(lts, pd).applyUnsafe();
+ break;
+ case DELETE_ROW:
+ gen.deleteRow(lts, pd, cd).applyUnsafe();
+ break;
+ case DELETE_COLUMN:
+ gen.deleteColumn(lts, pd, cd, opId).applyUnsafe();
+ break;
+ case DELETE_COLUMN_WITH_STATICS:
+ gen.deleteColumn(lts, pd, cd, opId).applyUnsafe();
+ gen.deleteStatic(lts, pd, opId).applyUnsafe();
+ break;
+ case DELETE_RANGE:
+ gen.deleteRange(lts, pd, opId).applyUnsafe();
+ break;
+ case DELETE_SLICE:
+ gen.deleteSlice(lts, pd, opId).applyUnsafe();
+ break;
+ }
+ }
+
+ public void forceFlush()
+ {
+ gen.flush();
+ }
+
+
+ @Override
+ public void shutdown() throws InterruptedException
+ {
+
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/fuzz/test/SSTableGeneratorTest.java b/test/distributed/org/apache/cassandra/distributed/fuzz/test/SSTableGeneratorTest.java
new file mode 100644
index 0000000..5050bc6
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/fuzz/test/SSTableGeneratorTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.fuzz.test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.collect.Iterators;
+import org.junit.Test;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.ColumnSpec;
+import harry.ddl.SchemaSpec;
+import harry.model.Model;
+import harry.model.QuiescentChecker;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.Query;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.LtsVisitor;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.distributed.fuzz.FixedSchemaProviderConfiguration;
+import org.apache.cassandra.distributed.fuzz.HarryHelper;
+import org.apache.cassandra.distributed.fuzz.SSTableLoadingVisitor;
+import org.apache.cassandra.distributed.impl.RowUtil;
+
+public class SSTableGeneratorTest extends CQLTester
+{
+ private static final Configuration configuration;
+
+ static
+ {
+ try
+ {
+ HarryHelper.init();
+ configuration = HarryHelper.defaultConfiguration()
+ .setClock(() -> new OffsetClock(10000L))
+ .build();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static SchemaSpec schemaSpec = new SchemaSpec(KEYSPACE, "tbl1",
+ Arrays.asList(ColumnSpec.pk("pk1", ColumnSpec.asciiType),
+ ColumnSpec.pk("pk2", ColumnSpec.int64Type)),
+ Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType, false),
+ ColumnSpec.ck("ck2", ColumnSpec.int64Type, false)),
+ Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.int32Type),
+ ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
+ ColumnSpec.regularColumn("v3", ColumnSpec.int32Type),
+ ColumnSpec.regularColumn("v4", ColumnSpec.asciiType)),
+ Arrays.asList(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType),
+ ColumnSpec.staticColumn("s2", ColumnSpec.int64Type)));
+
+ @Test
+ public void testSSTableGenerator()
+ {
+ createTable(schemaSpec.compile().cql());
+ Run run = configuration.unbuild()
+ .setSchemaProvider(new FixedSchemaProviderConfiguration(schemaSpec))
+ .setSUT(CqlTesterSUT::new)
+ .build()
+ .createRun();
+
+
+ SSTableLoadingVisitor sstableVisitor = new SSTableLoadingVisitor(run, 1000);
+ LtsVisitor visitor = new GeneratingVisitor(run, sstableVisitor);
+ Set<Long> pds = new HashSet<>();
+ run.tracker.onLtsStarted((lts) -> pds.add(run.pdSelector.pd(lts, run.schemaSpec)));
+ for (int i = 0; i < 1000; i++)
+ visitor.visit();
+
+ sstableVisitor.forceFlush();
+
+ Model checker = new QuiescentChecker(run);
+ for (Long pd : pds)
+ checker.validate(Query.selectPartition(run.schemaSpec, pd, false));
+ }
+
+ public class CqlTesterSUT implements SystemUnderTest
+ {
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ public void shutdown()
+ {
+ }
+
+ public CompletableFuture<Object[][]> executeAsync(String s, ConsistencyLevel consistencyLevel, Object... objects)
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public Object[][] execute(String s, ConsistencyLevel consistencyLevel, Object... objects)
+ {
+ try
+ {
+ return Iterators.toArray(RowUtil.toIter(SSTableGeneratorTest.this.execute(s, objects)),
+ Object[].class);
+ }
+ catch (Throwable throwable)
+ {
+ throw new RuntimeException();
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org