You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/03/25 09:51:31 UTC
[cassandra-harry] 01/01: Numerious minor improvements while
preparing for fuzz-testing 4.0 in earnest:
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch CASSANDRA-16262
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git
commit 8238908162122dbad3eeb7857e1141f0b4205b95
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Wed Mar 24 16:11:57 2021 +0100
Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:
* Refactor Run to make it an entrypoint
* Separate Partition visitors from Row visitors
* Make it possible to effortlessly check local states
* Introduce CLs
* More clear distinction between the components allowing to implement visitors (such as repairing validator)
* Implement fault injecting partition visitor
* Extract DataTracker
* Minor bug fixes
Patch by Alex Petrov for CASSANDRA-16262.
---
harry-core/src/harry/core/Configuration.java | 384 +++++++++++++--------
.../sut/NoOpSut.java => core/MetricReporter.java} | 37 +-
harry-core/src/harry/core/Run.java | 64 ++--
.../src/harry/corruptor/AddExtraRowCorruptor.java | 4 +-
.../harry/corruptor/QueryResponseCorruptor.java | 2 +-
harry-core/src/harry/corruptor/RowCorruptor.java | 2 +-
harry-core/src/harry/ddl/SchemaGenerators.java | 66 +++-
harry-core/src/harry/ddl/SchemaSpec.java | 6 +
harry-core/src/harry/generators/PCGFastPure.java | 2 +-
harry-core/src/harry/model/DoNothingModel.java | 6 +-
harry-core/src/harry/model/ExhaustiveChecker.java | 83 ++---
harry-core/src/harry/model/Model.java | 23 +-
harry-core/src/harry/model/OpSelectors.java | 24 +-
harry-core/src/harry/model/QuiescentChecker.java | 54 +--
harry-core/src/harry/model/SelectHelper.java | 2 +-
.../harry/model/StatelessVisibleRowsChecker.java | 36 +-
harry-core/src/harry/model/VisibleRowsChecker.java | 113 +++---
harry-core/src/harry/model/sut/PrintlnSut.java | 9 +-
.../src/harry/model/sut/SystemUnderTest.java | 41 ++-
harry-core/src/harry/reconciler/Reconciler.java | 43 ++-
.../src/harry/runner/AbstractPartitionVisitor.java | 3 +-
.../src/harry/runner/AllPartitionsValidator.java | 106 ++++++
.../src/harry/runner/DataTracker.java | 33 +-
.../src/harry/runner/DefaultDataTracker.java | 140 ++++++++
.../runner/DefaultPartitionVisitorFactory.java | 198 -----------
harry-core/src/harry/runner/HarryRunner.java | 36 +-
.../src/harry/runner/LoggingPartitionVisitor.java | 100 ++++++
.../src/harry/runner/MutatingPartitionVisitor.java | 133 +++++++
...aultRowVisitor.java => MutatingRowVisitor.java} | 39 ++-
harry-core/src/harry/runner/PartitionVisitor.java | 11 +-
harry-core/src/harry/runner/QueryGenerator.java | 364 +++++++++++++++++++
.../src/harry/runner/RecentPartitionValidator.java | 84 +++++
harry-core/src/harry/runner/RowVisitor.java | 13 +-
harry-core/src/harry/runner/Runner.java | 162 ++++-----
.../src/harry/runner/SinglePartitionValidator.java | 56 +++
harry-core/src/harry/runner/Validator.java | 119 -------
harry-core/src/harry/util/Ranges.java | 25 +-
.../test/harry/generators/RandomGeneratorTest.java | 32 +-
.../test/harry/generators/SurjectionsTest.java | 4 +-
harry-core/test/harry/model/OpSelectorsTest.java | 96 +++---
harry-core/test/harry/operations/RelationTest.java | 215 ++++++------
.../model/sut/external/ExternalClusterSut.java | 24 +-
.../harry/runner/external/HarryRunnerExternal.java | 8 +-
.../src/harry/model/sut}/ExternalClusterSut.java | 78 ++---
.../src/harry/model/sut/InJvmSut.java | 111 +++++-
.../runner/FaultInjectingPartitionVisitor.java | 94 +++++
.../src/harry/runner/QueryingNoOpChecker.java | 65 ++++
.../harry/runner/RepairingLocalStateValidator.java | 137 ++++++++
harry-integration/src/harry/runner/Reproduce.java | 4 +-
.../model/ExhaustiveCheckerIntegrationTest.java | 211 +++++------
.../harry/model/ExhaustiveCheckerUnitTest.java | 100 +++---
.../test/harry/model/IntegrationTestBase.java | 28 +-
harry-integration/test/harry/model/ModelTest.java | 28 +-
.../test/harry/model/ModelTestBase.java | 102 ++++++
.../harry/model/QuerySelectorNegativeTest.java | 43 ++-
.../test/harry/model/QuerySelectorTest.java | 46 +--
.../model/QuiescentCheckerIntegrationTest.java | 111 +++---
.../test/harry/op/RowVisitorTest.java | 51 +--
pom.xml | 4 +-
59 files changed, 2812 insertions(+), 1403 deletions(-)
diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index a3c287f..27ee176 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -27,7 +27,6 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.Supplier;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -46,14 +45,16 @@ import harry.model.OpSelectors;
import harry.model.QuiescentChecker;
import harry.model.clock.ApproximateMonotonicClock;
import harry.model.sut.SystemUnderTest;
-import harry.runner.DefaultPartitionVisitorFactory;
-import harry.runner.DefaultRowVisitor;
+import harry.runner.AllPartitionsValidator;
+import harry.runner.DataTracker;
+import harry.runner.DefaultDataTracker;
+import harry.runner.LoggingPartitionVisitor;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
import harry.runner.PartitionVisitor;
-import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.RecentPartitionValidator;
import harry.runner.RowVisitor;
import harry.runner.Runner;
-import harry.runner.Validator;
import harry.util.BitSet;
public class Configuration
@@ -70,13 +71,22 @@ public class Configuration
mapper.registerSubtypes(Configuration.DebugApproximateMonotonicClockConfiguration.class);
mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
+ mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
+ mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
mapper.registerSubtypes(Configuration.ExhaustiveCheckerConfig.class);
+ mapper.registerSubtypes(Configuration.QuiescentCheckerConfig.class);
mapper.registerSubtypes(Configuration.DefaultCDSelectorConfiguration.class);
mapper.registerSubtypes(Configuration.DefaultPDSelectorConfiguration.class);
mapper.registerSubtypes(Configuration.ConstantDistributionConfig.class);
mapper.registerSubtypes(DefaultSchemaProviderConfiguration.class);
- mapper.registerSubtypes(DefaultRowVisitorConfiguration.class);
+ mapper.registerSubtypes(MutatingRowVisitorConfiguration.class);
+
+ mapper.registerSubtypes(MutatingPartitionVisitorConfiguation.class);
+ mapper.registerSubtypes(LoggingPartitionVisitorConfiguration.class);
+ mapper.registerSubtypes(AllPartitionsValidatorConfiguration.class);
+ mapper.registerSubtypes(RecentPartitionsValidatorConfiguration.class);
+ mapper.registerSubtypes(FixedSchemaProviderConfiguration.class);
}
public final long seed;
@@ -86,12 +96,11 @@ public class Configuration
public final boolean create_schema;
public final boolean truncate_table;
+ public final MetricReporterConfiguration metric_reporter;
public final ClockConfiguration clock;
- public final RunnerConfiguration runner;
public final SutConfiguration system_under_test;
- public final ModelConfiguration model;
- public final RowVisitorConfiguration row_visitor;
-
+ public final DataTrackerConfiguration data_tracker;
+ public final RunnerConfiguration runner;
public final PDSelectorConfiguration partition_descriptor_selector;
public final CDSelectorConfiguration clustering_descriptor_selector;
@@ -104,11 +113,11 @@ public class Configuration
@JsonProperty("drop_schema") boolean drop_schema,
@JsonProperty("create_schema") boolean create_schema,
@JsonProperty("truncate_schema") boolean truncate_table,
+ @JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
@JsonProperty("clock") ClockConfiguration clock,
@JsonProperty("runner") RunnerConfiguration runner,
@JsonProperty("system_under_test") SutConfiguration system_under_test,
- @JsonProperty("model") ModelConfiguration model,
- @JsonProperty("row_visitor") RowVisitorConfiguration row_visitor,
+ @JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
@JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
@JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
@JsonProperty(value = "run_time", defaultValue = "2") long run_time,
@@ -119,15 +128,15 @@ public class Configuration
this.drop_schema = drop_schema;
this.create_schema = create_schema;
this.truncate_table = truncate_table;
+ this.metric_reporter = metric_reporter;
this.clock = clock;
- this.runner = runner;
this.system_under_test = system_under_test;
- this.model = model;
- this.row_visitor = row_visitor;
+ this.data_tracker = data_tracker;
this.partition_descriptor_selector = partition_descriptor_selector;
this.clustering_descriptor_selector = clustering_descriptor_selector;
this.run_time = run_time;
this.run_time_unit = run_time_unit;
+ this.runner = runner;
}
public static void registerSubtypes(Class<?>... classes)
@@ -178,6 +187,13 @@ public class Configuration
public static void validate(Configuration config)
{
+ Objects.requireNonNull(config.schema_provider, "Schema provider should not be null");
+ Objects.requireNonNull(config.metric_reporter, "Metric reporter should not be null");
+ Objects.requireNonNull(config.clock, "Clock should not be null");
+ Objects.requireNonNull(config.system_under_test, "System under test should not be null");
+ Objects.requireNonNull(config.partition_descriptor_selector, "Partition descriptor selector should not be null");
+ Objects.requireNonNull(config.clustering_descriptor_selector, "Clustering descriptor selector should not be null");
+
// TODO: validation
//assert historySize * clockEpochTimeUnit.toMillis(clockEpoch) > runTimePeriod.toMillis(runTime) : "History size is too small for this run";
}
@@ -194,53 +210,39 @@ public class Configuration
public static Run createRun(Configuration snapshot)
{
+ validate(snapshot);
+
long seed = snapshot.seed;
+ DataTracker tracker = snapshot.data_tracker == null ? new DefaultDataTrackerConfiguration().make() : snapshot.data_tracker.make();
OpSelectors.Rng rng = new OpSelectors.PCGFast(seed);
OpSelectors.MonotonicClock clock = snapshot.clock.make();
- // TODO: parsing schema
+ MetricReporter metricReporter = snapshot.metric_reporter.make();
+ // TODO: parse schema
SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);
+ schemaSpec.validate();
OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
SystemUnderTest sut = snapshot.system_under_test.make();
- QuerySelector querySelector = new QuerySelector(schemaSpec,
- pdSelector,
- descriptorSelector,
- Surjections.pick(Query.QueryKind.CLUSTERING_SLICE,
- Query.QueryKind.CLUSTERING_RANGE),
- rng);
- Model model = snapshot.model.create(schemaSpec, pdSelector, descriptorSelector, clock, querySelector, sut);
- Validator validator = new Validator(model, schemaSpec, clock, pdSelector, descriptorSelector, rng);
-
- RowVisitor rowVisitor;
- if (snapshot.row_visitor != null)
- rowVisitor = snapshot.row_visitor.make(schemaSpec, clock, descriptorSelector, querySelector);
- else
- rowVisitor = new DefaultRowVisitor(schemaSpec, clock, descriptorSelector, querySelector);
-
- // TODO: make this one configurable, too?
- Supplier<PartitionVisitor> visitorFactory = new DefaultPartitionVisitorFactory(model, sut, pdSelector, descriptorSelector, schemaSpec, rowVisitor);
+
return new Run(rng,
clock,
pdSelector,
descriptorSelector,
schemaSpec,
- model,
+ tracker,
sut,
- validator,
- rowVisitor,
- visitorFactory,
- snapshot);
+ metricReporter);
}
- public static Runner createRunner(Configuration snapshot)
+ public static Runner createRunner(Configuration config)
{
- Run run = createRun(snapshot);
- return snapshot.runner.make(run);
+ Run run = createRun(config);
+ return config.runner.make(run, config);
}
public static class ConfigurationBuilder
@@ -253,11 +255,10 @@ public class Configuration
boolean truncate_table;
ClockConfiguration clock;
+ MetricReporterConfiguration metric_reporter = new NoOpMetricReporterConfiguration();
+ DataTrackerConfiguration data_tracker = new DefaultDataTrackerConfiguration();
RunnerConfiguration runner;
SutConfiguration system_under_test;
- ModelConfiguration model;
- RowVisitorConfiguration row_visitor = new DefaultRowVisitorConfiguration();
-
PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
@@ -283,13 +284,19 @@ public class Configuration
return this;
}
+
+ public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
+ {
+ this.data_tracker = tracker;
+ return this;
+ }
+
public ConfigurationBuilder setClock(ClockConfiguration clock)
{
this.clock = clock;
return this;
}
-
public ConfigurationBuilder setSUT(SutConfiguration system_under_test)
{
this.system_under_test = system_under_test;
@@ -314,12 +321,6 @@ public class Configuration
return this;
}
- public ConfigurationBuilder setModel(ModelConfiguration model)
- {
- this.model = model;
- return this;
- }
-
public ConfigurationBuilder setRunner(RunnerConfiguration runner)
{
this.runner = runner;
@@ -345,28 +346,28 @@ public class Configuration
return setClusteringDescriptorSelector(builder.build());
}
- public ConfigurationBuilder setRowVisitor(RowVisitorConfiguration row_visitor)
+ public ConfigurationBuilder setMetricReporter(MetricReporterConfiguration metric_reporter)
{
- this.row_visitor = row_visitor;
+ this.metric_reporter = metric_reporter;
return this;
}
public Configuration build()
{
return new Configuration(seed,
- Objects.requireNonNull(schema_provider, "Schema provider should not be null"),
+ schema_provider,
drop_schema,
create_schema,
truncate_table,
+ metric_reporter,
+ clock,
- Objects.requireNonNull(clock, "Clock should not be null"),
runner,
- Objects.requireNonNull(system_under_test, "System under test should not be null"),
- Objects.requireNonNull(model, "Model should not be null"),
- Objects.requireNonNull(row_visitor, "Row visitor should not be null"),
+ system_under_test,
+ data_tracker,
- Objects.requireNonNull(partition_descriptor_selector, "Partition descriptor selector should not be null"),
- Objects.requireNonNull(clustering_descriptor_selector, "Clustering descriptor selector should not be null"),
+ partition_descriptor_selector,
+ clustering_descriptor_selector,
run_time,
run_time_unit);
@@ -385,8 +386,6 @@ public class Configuration
builder.clock = clock;
builder.runner = runner;
builder.system_under_test = system_under_test;
- builder.model = model;
- builder.row_visitor = row_visitor;
builder.partition_descriptor_selector = partition_descriptor_selector;
builder.clustering_descriptor_selector = clustering_descriptor_selector;
@@ -396,6 +395,40 @@ public class Configuration
return builder;
}
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+ public interface DataTrackerConfiguration extends DataTracker.DataTrackerFactory
+ {
+
+ }
+
+ @JsonTypeName("default")
+ public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
+ {
+ public final long max_seen_lts;
+ public final long max_complete_lts;
+
+ public DefaultDataTrackerConfiguration()
+ {
+ this(-1, -1);
+ }
+
+ @JsonCreator
+ public DefaultDataTrackerConfiguration(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
+ @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
+ {
+ this.max_seen_lts = max_seen_lts;
+ this.max_complete_lts = max_complete_lts;
+ }
+
+ public DataTracker make()
+ {
+ DefaultDataTracker defaultDataTracker = new DefaultDataTracker();
+ defaultDataTracker.forceLts(max_seen_lts, max_complete_lts);
+ return defaultDataTracker;
+ }
+ }
+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
public interface ClockConfiguration extends OpSelectors.MonotonicClockFactory
{
@@ -475,46 +508,39 @@ public class Configuration
@JsonTypeName("concurrent")
public static class ConcurrentRunnerConfig implements RunnerConfiguration
{
- public final int writer_threads;
- public final int round_robin_validator_threads;
- public final int recent_partition_validator_threads;
+ public final int concurrency;
+ public final List<PartitionVisitorConfiguration> partition_visitor_factories;
@JsonCreator
- public ConcurrentRunnerConfig(@JsonProperty(value = "writer_threads", defaultValue = "2") int writer_threads,
- @JsonProperty(value = "round_robin_validator_threads", defaultValue = "2") int round_robin_validator_threads,
- @JsonProperty(value = "recent_partition_validator_threads", defaultValue = "2") int recent_partition_validator_threads)
+ public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
+ @JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
{
- this.writer_threads = writer_threads;
- this.round_robin_validator_threads = round_robin_validator_threads;
- this.recent_partition_validator_threads = recent_partition_validator_threads;
+ this.concurrency = concurrency;
+ this.partition_visitor_factories = partitionVisitors;
}
- public Runner make(Run run)
+ @Override
+ public Runner make(Run run, Configuration config)
{
- return new Runner.ConcurrentRunner(run, writer_threads, round_robin_validator_threads, recent_partition_validator_threads);
+ return new Runner.ConcurrentRunner(run, config, concurrency, partition_visitor_factories);
}
}
@JsonTypeName("sequential")
public static class SequentialRunnerConfig implements RunnerConfiguration
{
- private final int round_robin_validator_threads;
- private final int check_recent_after;
- private final int check_all_after;
+ public final List<PartitionVisitorConfiguration> partition_visitor_factories;
@JsonCreator
- public SequentialRunnerConfig(@JsonProperty(value = "round_robin_validator_threads", defaultValue = "2") int round_robin_validator_threads,
- @JsonProperty(value = "check_recent_after", defaultValue = "100") int check_recent_after,
- @JsonProperty(value = "check_all_after", defaultValue = "5000") int check_all_after)
+ public SequentialRunnerConfig(@JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
{
- this.round_robin_validator_threads = round_robin_validator_threads;
- this.check_recent_after = check_recent_after;
- this.check_all_after = check_all_after;
+ this.partition_visitor_factories = partitionVisitors;
}
- public Runner make(Run run)
+ @Override
+ public Runner make(Run run, Configuration config)
{
- return new Runner.SequentialRunner(run, round_robin_validator_threads, check_recent_after, check_all_after);
+ return new Runner.SequentialRunner(run, config, partition_visitor_factories);
}
}
@@ -531,64 +557,29 @@ public class Configuration
@JsonTypeName("exhaustive_checker")
public static class ExhaustiveCheckerConfig implements ModelConfiguration
{
- public final long max_seen_lts;
- public final long max_complete_lts;
-
+ @JsonCreator
public ExhaustiveCheckerConfig()
{
- this(-1, -1);
- }
- @JsonCreator
- public ExhaustiveCheckerConfig(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
- @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
- {
- this.max_seen_lts = max_seen_lts;
- this.max_complete_lts = max_complete_lts;
}
- public Model create(SchemaSpec schema, OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, OpSelectors.MonotonicClock clock, QuerySelector querySelector, SystemUnderTest sut)
+ public Model make(Run run)
{
- ExhaustiveChecker exhaustiveChecker = new ExhaustiveChecker(schema,
- pdSelector,
- descriptorSelector,
- clock,
- querySelector,
- sut);
- exhaustiveChecker.forceLts(max_seen_lts, max_complete_lts);
- return exhaustiveChecker;
+ return new ExhaustiveChecker(run);
}
}
@JsonTypeName("quiescent_checker")
public static class QuiescentCheckerConfig implements ModelConfiguration
{
- public final long max_seen_lts;
- public final long max_complete_lts;
-
- public QuiescentCheckerConfig()
- {
- this(-1, -1);
- }
-
@JsonCreator
- public QuiescentCheckerConfig(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
- @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
+ public QuiescentCheckerConfig()
{
- this.max_seen_lts = max_seen_lts;
- this.max_complete_lts = max_complete_lts;
}
- public Model create(SchemaSpec schema, OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, OpSelectors.MonotonicClock clock, QuerySelector querySelector, SystemUnderTest sut)
+ public Model make(Run run)
{
- QuiescentChecker exhaustiveChecker = new QuiescentChecker(schema,
- pdSelector,
- descriptorSelector,
- clock,
- querySelector,
- sut);
- exhaustiveChecker.forceLts(max_seen_lts, max_complete_lts);
- return exhaustiveChecker;
+ return new QuiescentChecker(run);
}
}
@@ -875,23 +866,108 @@ public class Configuration
}
}
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+ public interface PartitionVisitorConfiguration extends PartitionVisitor.PartitionVisitorFactory
+ {
+ }
+
+
+ @JsonTypeName("mutating")
+ public static class MutatingPartitionVisitorConfiguation implements PartitionVisitorConfiguration
+ {
+ protected final RowVisitorConfiguration row_visitor;
+
+ @JsonCreator
+ public MutatingPartitionVisitorConfiguation(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+ {
+ this.row_visitor = row_visitor;
+ }
+
+ @Override
+ public PartitionVisitor make(Run run)
+ {
+ return new MutatingPartitionVisitor(run, row_visitor);
+ }
+ }
+
+ @JsonTypeName("logging")
+ public static class LoggingPartitionVisitorConfiguration implements PartitionVisitorConfiguration
+ {
+ protected final RowVisitorConfiguration row_visitor;
+
+ @JsonCreator
+ public LoggingPartitionVisitorConfiguration(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+ {
+ this.row_visitor = row_visitor;
+ }
+
+ @Override
+ public PartitionVisitor make(Run run)
+ {
+ return new LoggingPartitionVisitor(run, row_visitor);
+ }
+ }
+
+ @JsonTypeName("validate_all_partitions")
+ public static class AllPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+ {
+ private final int concurrency;
+ private final int trigger_after;
+ private final Configuration.ModelConfiguration modelConfiguration;
+
+ @JsonCreator
+ public AllPartitionsValidatorConfiguration(@JsonProperty("concurrency") int concurrency,
+ @JsonProperty("trigger_after") int trigger_after,
+ @JsonProperty("model") Configuration.ModelConfiguration model)
+ {
+ this.concurrency = concurrency;
+ this.trigger_after = trigger_after;
+ this.modelConfiguration = model;
+ }
+
+ public PartitionVisitor make(Run run)
+ {
+ return new AllPartitionsValidator(concurrency, trigger_after, run, modelConfiguration);
+ }
+ }
+
+ @JsonTypeName("validate_recent_partitions")
+ public static class RecentPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+ {
+ private final int partition_count;
+ private final int trigger_after;
+ private final Configuration.ModelConfiguration modelConfiguration;
+
+ @JsonCreator
+ public RecentPartitionsValidatorConfiguration(@JsonProperty("partition_count") int partition_count,
+ @JsonProperty("trigger_after") int trigger_after,
+ @JsonProperty("model") Configuration.ModelConfiguration model)
+ {
+ this.partition_count = partition_count;
+ this.trigger_after = trigger_after;
+ this.modelConfiguration = model;
+ }
+
+ @Override
+ public PartitionVisitor make(Run run)
+ {
+ return new RecentPartitionValidator(partition_count, trigger_after, run, modelConfiguration);
+ }
+ }
+
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
public interface RowVisitorConfiguration extends RowVisitor.RowVisitorFactory
{
}
- @JsonTypeName("default")
- public static class DefaultRowVisitorConfiguration implements RowVisitorConfiguration
+ @JsonTypeName("mutating")
+ public static class MutatingRowVisitorConfiguration implements RowVisitorConfiguration
{
- public RowVisitor make(SchemaSpec schema,
- OpSelectors.MonotonicClock clock,
- OpSelectors.DescriptorSelector descriptorSelector,
- QuerySelector querySelector)
+ @Override
+ public RowVisitor make(Run run)
{
- return new DefaultRowVisitor(schema,
- clock,
- descriptorSelector,
- querySelector);
+ return new MutatingRowVisitor(run);
}
}
@@ -910,5 +986,41 @@ public class Configuration
}
}
+ @JsonTypeName("fixed")
+ public static class FixedSchemaProviderConfiguration implements SchemaProviderConfiguration
+ {
+ private final SchemaSpec schemaSpec;
+
+ @JsonCreator
+ public FixedSchemaProviderConfiguration(@JsonProperty("keyspace") String keyspace,
+ @JsonProperty("table") String table,
+ @JsonProperty("partition_keys") Map<String, String> pks,
+ @JsonProperty("clustering_keys") Map<String, String> cks,
+ @JsonProperty("regular_columns") Map<String, String> regulars)
+ {
+ this.schemaSpec = SchemaGenerators.parse(keyspace, table,
+ pks, cks, regulars);
+ }
+
+ public SchemaSpec make(long seed)
+ {
+ return schemaSpec;
+ }
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+ public interface MetricReporterConfiguration extends MetricReporter.MetricReporterFactory
+ {
+ }
+
+ @JsonTypeName("default")
+ public static class NoOpMetricReporterConfiguration implements MetricReporterConfiguration
+ {
+ public MetricReporter make()
+ {
+ return MetricReporter.NO_OP;
+ }
+ }
+
// TODO: schema provider by DDL
}
diff --git a/harry-core/src/harry/model/sut/NoOpSut.java b/harry-core/src/harry/core/MetricReporter.java
similarity index 58%
rename from harry-core/src/harry/model/sut/NoOpSut.java
rename to harry-core/src/harry/core/MetricReporter.java
index c09490f..b576ec3 100644
--- a/harry-core/src/harry/model/sut/NoOpSut.java
+++ b/harry-core/src/harry/core/MetricReporter.java
@@ -16,29 +16,34 @@
* limitations under the License.
*/
-package harry.model.sut;
+package harry.core;
-import java.util.concurrent.CompletableFuture;
-
-public class NoOpSut implements SystemUnderTest
+public interface MetricReporter
{
- public boolean isShutdown()
- {
- return false;
- }
+ void columnDelete();
+ void rowDelete();
+ void insert();
+ void rangeDelete();
- public void shutdown()
- {
- }
+ void validatePartition();
+ void validateRandomQuery();
- public Object[][] execute(String statement, Object... bindings)
+ interface MetricReporterFactory
{
- return new Object[0][];
+ MetricReporter make();
}
- public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+ MetricReporter NO_OP = new NoOpMetricReporter();
+
+ class NoOpMetricReporter implements MetricReporter
{
- return CompletableFuture.supplyAsync(() -> execute(statement, bindings),
- Runnable::run);
+ private NoOpMetricReporter() {}
+
+ public void columnDelete(){}
+ public void rowDelete(){}
+ public void insert(){}
+ public void rangeDelete(){}
+ public void validatePartition(){}
+ public void validateRandomQuery(){}
}
}
diff --git a/harry-core/src/harry/core/Run.java b/harry-core/src/harry/core/Run.java
index fdff08e..d31e7ad 100644
--- a/harry-core/src/harry/core/Run.java
+++ b/harry-core/src/harry/core/Run.java
@@ -18,56 +18,62 @@
package harry.core;
-import java.util.function.Supplier;
-
import harry.ddl.SchemaSpec;
-import harry.model.Model;
import harry.model.OpSelectors;
import harry.model.sut.SystemUnderTest;
-import harry.runner.PartitionVisitor;
-import harry.runner.RowVisitor;
-import harry.runner.Validator;
+import harry.runner.DataTracker;
+import harry.runner.QueryGenerator;
public class Run
{
public final OpSelectors.Rng rng;
public final OpSelectors.MonotonicClock clock;
public final OpSelectors.PdSelector pdSelector;
-
public final OpSelectors.DescriptorSelector descriptorSelector;
+ public final QueryGenerator rangeSelector;
public final SchemaSpec schemaSpec;
- public final Model model;
+ public final DataTracker tracker;
public final SystemUnderTest sut;
- public final Validator validator;
- public final RowVisitor rowVisitor;
- public final Supplier<PartitionVisitor> visitorFactory;
- public final Configuration snapshot;
+ public final MetricReporter metricReporter;
+
+
+ public Run(OpSelectors.Rng rng,
+ OpSelectors.MonotonicClock clock,
+ OpSelectors.PdSelector pdSelector,
+ OpSelectors.DescriptorSelector descriptorSelector,
- Run(OpSelectors.Rng rng,
- OpSelectors.MonotonicClock clock,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
+ SchemaSpec schemaSpec,
+ DataTracker tracker,
+ SystemUnderTest sut,
+ MetricReporter metricReporter)
+ {
+ this(rng, clock, pdSelector, descriptorSelector,
+ new QueryGenerator(schemaSpec, pdSelector, descriptorSelector, rng),
+ schemaSpec, tracker, sut, metricReporter);
+ }
- SchemaSpec schemaSpec,
- Model model,
- SystemUnderTest sut,
- Validator validator,
- RowVisitor rowVisitor,
- Supplier<PartitionVisitor> visitorFactory,
- Configuration snapshot)
+ private Run(OpSelectors.Rng rng,
+ OpSelectors.MonotonicClock clock,
+ OpSelectors.PdSelector pdSelector,
+ OpSelectors.DescriptorSelector descriptorSelector,
+ QueryGenerator rangeSelector,
+
+ SchemaSpec schemaSpec,
+ DataTracker tracker,
+ SystemUnderTest sut,
+ MetricReporter metricReporter)
{
+
this.rng = rng;
this.clock = clock;
this.pdSelector = pdSelector;
this.descriptorSelector = descriptorSelector;
+ this.rangeSelector = rangeSelector;
this.schemaSpec = schemaSpec;
- this.model = model;
+ this.tracker = tracker;
this.sut = sut;
- this.validator = validator;
- this.rowVisitor = rowVisitor;
- this.visitorFactory = visitorFactory;
- this.snapshot = snapshot;
+ this.metricReporter = metricReporter;
}
-}
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
index 9a73867..f2a8b28 100644
--- a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
@@ -53,7 +53,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
{
Set<Long> cds = new HashSet<>();
long maxLts = 0;
- for (Object[] obj : sut.execute(query.toSelectStatement()))
+ for (Object[] obj : sut.execute(query.toSelectStatement(), SystemUnderTest.ConsistencyLevel.ALL))
{
ResultSetRow row = SelectHelper.resultSetToRow(schema, clock, obj);
// TODO: extract CD cheaper
@@ -84,7 +84,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
// written value and tombstone are resolved in favour of tombstone, so we're
// just going to take the next lts.
logger.info("Corrupting the resultset by writing a row with cd {}", cd);
- sut.execute(WriteHelper.inflateInsert(schema, query.pd, cd, vds, clock.rts(maxLts) + 1));
+ sut.execute(WriteHelper.inflateInsert(schema, query.pd, cd, vds, clock.rts(maxLts) + 1), SystemUnderTest.ConsistencyLevel.ALL);
return true;
}
}
\ No newline at end of file
diff --git a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
index aefce0f..301c50b 100644
--- a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
+++ b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
@@ -52,7 +52,7 @@ public interface QueryResponseCorruptor
{
List<ResultSetRow> result = new ArrayList<>();
CompiledStatement statement = query.toSelectStatement();
- for (Object[] obj : sut.execute(statement.cql(), statement.bindings()))
+ for (Object[] obj : sut.execute(statement.cql(), SystemUnderTest.ConsistencyLevel.ALL, statement.bindings()))
result.add(SelectHelper.resultSetToRow(schema, clock, obj));
// TODO: technically, we can do this just depends on corruption strategy
diff --git a/harry-core/src/harry/corruptor/RowCorruptor.java b/harry-core/src/harry/corruptor/RowCorruptor.java
index 85261b2..fbd5fa9 100644
--- a/harry-core/src/harry/corruptor/RowCorruptor.java
+++ b/harry-core/src/harry/corruptor/RowCorruptor.java
@@ -36,7 +36,7 @@ public interface RowCorruptor
if (canCorrupt(row))
{
CompiledStatement statement = corrupt(row);
- sut.execute(statement.cql(), statement.bindings());
+ sut.execute(statement.cql(), SystemUnderTest.ConsistencyLevel.ALL, statement.bindings());
return true;
}
return false;
diff --git a/harry-core/src/harry/ddl/SchemaGenerators.java b/harry-core/src/harry/ddl/SchemaGenerators.java
index aecc189..53ab6f5 100644
--- a/harry-core/src/harry/ddl/SchemaGenerators.java
+++ b/harry-core/src/harry/ddl/SchemaGenerators.java
@@ -18,15 +18,19 @@
package harry.ddl;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.fasterxml.jackson.annotation.JsonProperty;
import harry.generators.Generator;
import harry.generators.Surjections;
@@ -40,10 +44,12 @@ public class SchemaGenerators
}
public static final Collection<ColumnSpec.DataType<?>> clusteringKeyTypes;
+ public static final Map<String, ColumnSpec.DataType<?>> nameToTypeMap;
public static final Collection<ColumnSpec.DataType<?>> columnTypes;
static
{
+
ImmutableList.Builder<ColumnSpec.DataType<?>> builder = ImmutableList.builder();
builder.add(ColumnSpec.int8Type,
ColumnSpec.int16Type,
@@ -51,17 +57,27 @@ public class SchemaGenerators
ColumnSpec.int64Type,
// TODO re-enable boolean type; add it to ByteBufferUtil in Cassandra for that
// ColumnSpec.booleanType,
- ColumnSpec.floatType,
- ColumnSpec.doubleType,
ColumnSpec.asciiType);
columnTypes = builder.build();
builder = ImmutableList.builder();
builder.addAll(columnTypes);
+
+ ImmutableMap.Builder<String, ColumnSpec.DataType<?>> mapBuilder = ImmutableMap.builder();
+
for (ColumnSpec.DataType<?> columnType : columnTypes)
{
- builder.add(ColumnSpec.ReversedType.getInstance(columnType));
+ ColumnSpec.DataType<?> reversedType = ColumnSpec.ReversedType.getInstance(columnType);
+ builder.add(reversedType);
+
+ mapBuilder.put(columnType.toString(), columnType);
+ mapBuilder.put(String.format("desc(%s)", columnType.toString()), columnType);
}
+
+ builder.add(ColumnSpec.floatType);
+ builder.add(ColumnSpec.doubleType);
+
clusteringKeyTypes = builder.build();
+ nameToTypeMap = mapBuilder.build();
}
@SuppressWarnings("unchecked")
@@ -281,7 +297,7 @@ public class SchemaGenerators
public static Surjections.Surjection<SchemaSpec> defaultSchemaSpecGen(String ks, String table)
{
return new SchemaGenerators.Builder(ks, () -> table)
- .partitionKeySpec(1, 4,
+ .partitionKeySpec(2, 4,
// ColumnSpec.int8Type,
// ColumnSpec.int16Type,
ColumnSpec.int32Type,
@@ -289,7 +305,7 @@ public class SchemaGenerators
// ColumnSpec.floatType,
// ColumnSpec.doubleType,
ColumnSpec.asciiType(4, 10))
- .clusteringKeySpec(1, 4,
+ .clusteringKeySpec(2, 4,
// ColumnSpec.int8Type,
// ColumnSpec.int16Type,
ColumnSpec.int32Type,
@@ -366,6 +382,7 @@ public class SchemaGenerators
longAndStringSpecWithReversedBothBuilder,
withAllFeaturesEnabled
};
+
// Create schema generators that would produce tables starting with just a few features, progressing to use more
public static Supplier<SchemaSpec> progression(int switchAfter)
{
@@ -375,10 +392,11 @@ public class SchemaGenerators
return new Supplier<SchemaSpec>()
{
- private final AtomicInteger counter = new AtomicInteger();
+ private int counter = 0;
public SchemaSpec get()
{
- int idx = (counter.getAndIncrement() / switchAfter) % generators.length;
+ int idx = (counter / switchAfter) % generators.length;
+ counter++;
SchemaSpec spec = generators[idx].get();
int tries = 100;
while ((spec.ckGenerator.byteSize() != Long.BYTES || spec.pkGenerator.byteSize() != Long.BYTES) && tries > 0)
@@ -388,6 +406,8 @@ public class SchemaGenerators
tries--;
}
+ spec.validate();
+
assert tries > 0 : String.format("Max number of tries exceeded on generator %d, can't generate a needed schema", idx);
return spec;
}
@@ -396,12 +416,34 @@ public class SchemaGenerators
};
}
- public static int DEFAULT_SWITCH_AFTER = 5;
- public static int GENERATORS_COUNT = PROGRESSIVE_GENERATORS.length;
- public static int DEFAULT_RUNS = DEFAULT_SWITCH_AFTER * PROGRESSIVE_GENERATORS.length;
+ public static List<ColumnSpec<?>> toColumns(Map<String, String> config, ColumnSpec.Kind kind, boolean allowReverse)
+ {
+ List<ColumnSpec<?>> columns = new ArrayList<>(config.size());
+
+ for (Map.Entry<String, String> e : config.entrySet())
+ {
+ ColumnSpec.DataType<?> type = nameToTypeMap.get(e.getValue());
+ assert type != null : "Can't parse the type";
+ assert allowReverse || !type.isReversed() : String.format("%s columns aren't allowed to be reversed");
+ columns.add(new ColumnSpec<>(e.getKey(), type, kind));
+ }
- public static Supplier<SchemaSpec> progression()
+ return columns;
+ }
+
+ public static SchemaSpec parse(String keyspace,
+ String table,
+ Map<String, String> pks,
+ Map<String, String> cks,
+ Map<String, String> regulars)
{
- return progression(DEFAULT_SWITCH_AFTER); // would generate 30 tables before wrapping around
+ return new SchemaSpec(keyspace, table,
+ toColumns(pks, ColumnSpec.Kind.PARTITION_KEY, false),
+ toColumns(cks, ColumnSpec.Kind.CLUSTERING, false),
+ toColumns(regulars, ColumnSpec.Kind.REGULAR, false));
}
+
+ public static int DEFAULT_SWITCH_AFTER = Integer.getInteger("harry.test.progression.switch-after", 5);
+ public static int GENERATORS_COUNT = PROGRESSIVE_GENERATORS.length;
+ public static int DEFAULT_RUNS = DEFAULT_SWITCH_AFTER * GENERATORS_COUNT;
}
\ No newline at end of file
diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java
index 336bd67..11b7009 100644
--- a/harry-core/src/harry/ddl/SchemaSpec.java
+++ b/harry-core/src/harry/ddl/SchemaSpec.java
@@ -96,6 +96,12 @@ public class SchemaSpec
this.ALL_COLUMNS_BITSET = BitSet.allSet(regularColumns.size());
}
+ public void validate()
+ {
+ assert pkGenerator.byteSize() == Long.BYTES : partitionKeys.toString();
+ assert ckGenerator.byteSize() == Long.BYTES : clusteringKeys.toString();
+ }
+
public static interface AddRelationCallback
{
public void accept(ColumnSpec spec, Relation.RelationKind kind, Object value);
diff --git a/harry-core/src/harry/generators/PCGFastPure.java b/harry-core/src/harry/generators/PCGFastPure.java
index 48bfae3..9e438dc 100644
--- a/harry-core/src/harry/generators/PCGFastPure.java
+++ b/harry-core/src/harry/generators/PCGFastPure.java
@@ -105,7 +105,7 @@ public class PCGFastPure
curMult *= curMult;
}
- return distance - 1;
+ return distance;
}
public static long shuffle(long state)
diff --git a/harry-core/src/harry/model/DoNothingModel.java b/harry-core/src/harry/model/DoNothingModel.java
index c21392d..35b5f86 100644
--- a/harry-core/src/harry/model/DoNothingModel.java
+++ b/harry-core/src/harry/model/DoNothingModel.java
@@ -23,11 +23,7 @@ import harry.runner.Query;
public class DoNothingModel implements Model
{
- public void recordEvent(long lts, boolean quorumAchieved)
- {
- }
-
- public void validatePartitionState(long verificationLts, Query query)
+ public void validate(Query query)
{
}
diff --git a/harry-core/src/harry/model/ExhaustiveChecker.java b/harry-core/src/harry/model/ExhaustiveChecker.java
index 935da45..b98ef06 100644
--- a/harry-core/src/harry/model/ExhaustiveChecker.java
+++ b/harry-core/src/harry/model/ExhaustiveChecker.java
@@ -30,20 +30,21 @@ import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import harry.core.Configuration;
+import harry.core.Run;
import harry.data.ResultSetRow;
import harry.ddl.SchemaSpec;
import harry.model.sut.SystemUnderTest;
import harry.runner.AbstractPartitionVisitor;
+import harry.runner.DataTracker;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
import harry.util.BitSet;
import harry.util.Ranges;
@@ -61,42 +62,26 @@ public class ExhaustiveChecker implements Model
protected final OpSelectors.PdSelector pdSelector;
protected final OpSelectors.MonotonicClock clock;
protected final SystemUnderTest sut;
- protected final QuerySelector querySelector;
+ protected final QueryGenerator rangeSelector;
protected final DataTracker tracker;
private final SchemaSpec schema;
- public ExhaustiveChecker(SchemaSpec schema,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- OpSelectors.MonotonicClock clock,
- QuerySelector querySelector,
- SystemUnderTest sut)
+ public ExhaustiveChecker(Run run)
{
- this.descriptorSelector = descriptorSelector;
- this.pdSelector = pdSelector;
- this.tracker = new DataTracker();
- this.schema = schema;
- this.sut = sut;
- this.clock = clock;
- this.querySelector = querySelector;
+ this.descriptorSelector = run.descriptorSelector;
+ this.pdSelector = run.pdSelector;
+ this.tracker = run.tracker;
+ this.schema = run.schemaSpec;
+ this.sut = run.sut;
+ this.clock = run.clock;
+ this.rangeSelector = run.rangeSelector;
}
- public void recordEvent(long lts, boolean quorumAchieved)
+ public void validate(Query query)
{
- tracker.recordEvent(lts, quorumAchieved);
- }
-
- public DataTracker tracker()
- {
- return tracker;
- }
-
- public void validatePartitionState(long validationLts, Query query)
- {
- validatePartitionState(validationLts,
- query,
+ validatePartitionState(query,
() -> {
while (!Thread.currentThread().isInterrupted())
{
@@ -114,18 +99,18 @@ public class ExhaustiveChecker implements Model
});
}
- void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+ void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
{
// Before we execute SELECT, we know what was the lts of operation that is guaranteed to be visible
- long visibleLtsBound = tracker.maxCompleteLts();
+ long visibleLtsBound = tracker.maxConsecutiveFinished();
// TODO: when we implement a reorder buffer through a bitmap, we can just grab a bitmap _before_,
// and know that a combination of `consecutive` + `bitmap` gives us _all possible guaranteed-to-be-seen_ values
List<ResultSetRow> rows = rowsSupplier.get();
// by the time SELECT done, we grab max "possible" lts
- long inFlightLtsBound = tracker.maxSeenLts();
- PartitionState partitionState = inflatePartitionState(validationLts, inFlightLtsBound, query);
+ long maxSeenLts = tracker.maxStarted();
+ PartitionState partitionState = inflatePartitionState(maxSeenLts, query);
NavigableMap<Long, List<Operation>> operations = partitionState.operations;
LongComparator cmp = FORWARD_COMPARATOR;
if (query.reverse)
@@ -137,7 +122,7 @@ public class ExhaustiveChecker implements Model
if (!rows.isEmpty() && operations.isEmpty())
{
throw new ValidationException(String.format("Returned rows are not empty, but there were no records in the event log.\nRows: %s\nMax seen LTS: %s\nQuery: %s",
- rows, inFlightLtsBound, query));
+ rows, maxSeenLts, query));
}
PeekingIterator<ResultSetRow> rowIter = Iterators.peekingIterator(rows.iterator());
@@ -156,7 +141,7 @@ public class ExhaustiveChecker implements Model
if (rowIter.hasNext() && rowIter.peek().cd == cd)
{
ResultSetRow row = rowIter.next();
- RowValidationState validationState = new RowValidationState(row, visibleLtsBound, inFlightLtsBound, partitionState.rangeTombstones);
+ RowValidationState validationState = new RowValidationState(row, visibleLtsBound, maxSeenLts, partitionState.rangeTombstones);
// TODO: We only need to go for as long as we explain every column. In fact, we make state _less_ precise by allowing
// to continue moving back in time. So far this hasn't proven to be a source of any issues, but we should fix that.
@@ -230,14 +215,11 @@ public class ExhaustiveChecker implements Model
catch (Throwable t)
{
throw new ValidationException(String.format("Caught exception while validating the resultset %s." +
- "\nchecker.tracker().forceLts(%dL, %dL)" +
- "\nrun.validator.validatePartition(%dL)" +
"\nRow Iter Peek: %s" +
"\nValidated no rows:\n%s" +
"\nValidated rows:\n%s" +
"\nRows:\n%s",
query,
- inFlightLtsBound, visibleLtsBound, validationLts,
rowIter.hasNext() ? rowIter.peek() : null,
validatedNoRows,
validatedRows.stream().map(Object::toString).collect(Collectors.joining(",\n")),
@@ -320,14 +302,10 @@ public class ExhaustiveChecker implements Model
}
}
- public PartitionState inflatePartitionState(long validationLts, long maxLts, Query query)
+ public PartitionState inflatePartitionState(long maxLts, Query query)
{
- long currentLts = pdSelector.maxLts(validationLts);
- long pd = pdSelector.pd(currentLts, schema);
+ long currentLts = pdSelector.maxLtsFor(query.pd);
- if (pd != pdSelector.pd(validationLts, schema))
- throw new ValidationException("Partition descriptor %d doesn't match partition descriptor %d for LTS %d",
- pd, pdSelector.pd(validationLts, schema), validationLts);
NavigableMap<Long, List<Operation>> operations = new TreeMap<>();
List<Ranges.Range> ranges = new ArrayList<>();
@@ -338,7 +316,11 @@ public class ExhaustiveChecker implements Model
OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
if (opType == OpSelectors.OperationKind.DELETE_RANGE)
{
- ranges.add(maybeWrap(lts, opId, querySelector.inflate(lts, opId).toRange(lts)));
+ ranges.add(maybeWrap(lts, opId, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toRange(lts)));
+ }
+ else if (opType == OpSelectors.OperationKind.DELETE_SLICE)
+ {
+ ranges.add(maybeWrap(lts, opId, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toRange(lts)));
}
else if (query.match(cd)) // skip descriptors that are out of range
{
@@ -389,11 +371,6 @@ public class ExhaustiveChecker implements Model
}
}
- public Configuration.ExhaustiveCheckerConfig toConfig()
- {
- return new Configuration.ExhaustiveCheckerConfig(tracker.maxSeenLts(), tracker.maxCompleteLts());
- }
-
public String toString()
{
return "ExhaustiveChecker{" + tracker.toString() + '}';
@@ -557,12 +534,6 @@ public class ExhaustiveChecker implements Model
}
}
- @VisibleForTesting
- public void forceLts(long maxSeen, long maxComplete)
- {
- tracker.forceLts(maxSeen, maxComplete);
- }
-
public static class Operation implements Comparable<Operation>
{
public final long pd;
diff --git a/harry-core/src/harry/model/Model.java b/harry-core/src/harry/model/Model.java
index 04fe6f1..3168c8a 100644
--- a/harry-core/src/harry/model/Model.java
+++ b/harry-core/src/harry/model/Model.java
@@ -18,39 +18,22 @@
package harry.model;
-import harry.core.Configuration;
-import harry.ddl.SchemaSpec;
-import harry.model.sut.SystemUnderTest;
+import harry.core.Run;
import harry.runner.Query;
-import harry.runner.QuerySelector;
public interface Model
{
long NO_TIMESTAMP = Long.MIN_VALUE;
- void recordEvent(long lts, boolean quorumAchieved);
-
- void validatePartitionState(long verificationLts, Query query);
-
- Configuration.ModelConfiguration toConfig();
+ void validate(Query query);
interface ModelFactory
{
- Model create(SchemaSpec schema,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- OpSelectors.MonotonicClock clock,
- QuerySelector querySelector,
- SystemUnderTest sut);
+ Model make(Run run);
}
class ValidationException extends RuntimeException
{
- public ValidationException()
- {
- super();
- }
-
public ValidationException(String message)
{
super(message);
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index ad8e046..7d7ad6d 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -117,8 +117,7 @@ public interface OpSelectors
public abstract long prevLts(long lts);
- public abstract long maxLts(long lts);
-
+ public abstract long maxLtsFor(long pd);
public abstract long minLtsAt(long position);
public abstract long minLtsFor(long pd);
@@ -281,8 +280,8 @@ public interface OpSelectors
public long minLtsFor(long pd)
{
- long sequenceNumber = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
- return minLtsAt(sequenceNumber);
+ long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+ return minLtsAt(position);
}
public long positionFor(long lts)
@@ -291,6 +290,11 @@ public interface OpSelectors
return windowStart + lts % windowSize;
}
+ public long positionForPd(long pd)
+ {
+ return rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+ }
+
public long nextLts(long lts)
{
long slideCount = lts / switchAfter;
@@ -326,11 +330,9 @@ public interface OpSelectors
return slideCount * switchAfter - windowSize + positionInCycle;
}
- public long maxLts(long lts)
+ public long maxLtsFor(long pd)
{
- long windowStart = lts / switchAfter;
- long position = windowStart + lts % windowSize;
-
+ long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
return position * switchAfter + (slideAfterRepeats - 1) * windowSize;
}
@@ -607,8 +609,7 @@ public interface OpSelectors
public long vd(long pd, long cd, long lts, long opId, int col)
{
- // change randomNumber / sequenceNumber to prev/Next
- return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
+ return rng.randomNumber(opId, pd ^ cd ^ lts ^ col);
}
public long modificationId(long pd, long cd, long lts, long vd, int col)
@@ -622,6 +623,7 @@ public interface OpSelectors
WRITE,
DELETE_ROW,
DELETE_COLUMN,
- DELETE_RANGE
+ DELETE_RANGE,
+ DELETE_SLICE
}
}
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index f7b38a3..ffd50b3 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -22,54 +22,49 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.function.Supplier;
import harry.core.Configuration;
+import harry.core.Run;
import harry.data.ResultSetRow;
import harry.ddl.SchemaSpec;
import harry.model.sut.SystemUnderTest;
import harry.reconciler.Reconciler;
+import harry.runner.DataTracker;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
public class QuiescentChecker implements Model
{
- private final OpSelectors.MonotonicClock clock;
-
- private final DataTracker tracker;
- private final SystemUnderTest sut;
- private final Reconciler reconciler;
+ protected final OpSelectors.MonotonicClock clock;
- public QuiescentChecker(SchemaSpec schema,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- OpSelectors.MonotonicClock clock,
- QuerySelector querySelector,
+ protected final DataTracker tracker;
+ protected final SystemUnderTest sut;
+ protected final Reconciler reconciler;
- SystemUnderTest sut)
+ public QuiescentChecker(Run run)
{
- this.clock = clock;
- this.sut = sut;
+ this.clock = run.clock;
+ this.sut = run.sut;
- this.reconciler = new Reconciler(schema, pdSelector, descriptorSelector, querySelector);
- this.tracker = new DataTracker();
+ this.reconciler = new Reconciler(run.schemaSpec, run.pdSelector, run.descriptorSelector, run.rangeSelector);
+ this.tracker = run.tracker;
}
- public void recordEvent(long lts, boolean quorumAchieved)
+ public void validate(Query query)
{
- tracker.recordEvent(lts, quorumAchieved);
+ validate(() -> SelectHelper.execute(sut, clock, query), query);
}
- public void validatePartitionState(long verificationLts, Query query)
+ protected void validate(Supplier<List<ResultSetRow>> rowsSupplier, Query query)
{
- long maxCompeteLts = tracker.maxCompleteLts();
- long maxSeenLts = tracker.maxSeenLts();
+ long maxCompeteLts = tracker.maxConsecutiveFinished();
+ long maxSeenLts = tracker.maxStarted();
assert maxCompeteLts == maxSeenLts : "Runner hasn't settled down yet. " +
"Quiescent model can't be reliably used in such cases.";
- List<ResultSetRow> actualRows = SelectHelper.execute(sut, clock, query);
+ List<ResultSetRow> actualRows = rowsSupplier.get();
Iterator<ResultSetRow> actual = actualRows.iterator();
Collection<Reconciler.RowState> expectedRows = reconciler.inflatePartitionState(query.pd, maxSeenLts, query).rows(query.reverse);
Iterator<Reconciler.RowState> expected = expectedRows.iterator();
@@ -104,15 +99,4 @@ public class QuiescentChecker implements Model
actualRows);
}
}
-
- @VisibleForTesting
- public void forceLts(long maxSeen, long maxComplete)
- {
- tracker.forceLts(maxSeen, maxComplete);
- }
-
- public Configuration.ModelConfiguration toConfig()
- {
- return new Configuration.QuiescentCheckerConfig(tracker.maxSeenLts(), tracker.maxCompleteLts());
- }
}
\ No newline at end of file
diff --git a/harry-core/src/harry/model/SelectHelper.java b/harry-core/src/harry/model/SelectHelper.java
index 8cf9d80..2b50339 100644
--- a/harry-core/src/harry/model/SelectHelper.java
+++ b/harry-core/src/harry/model/SelectHelper.java
@@ -117,7 +117,7 @@ public class SelectHelper
public static List<ResultSetRow> execute(SystemUnderTest sut, OpSelectors.MonotonicClock clock, Query query)
{
CompiledStatement compiled = query.toSelectStatement();
- Object[][] objects = sut.execute(compiled.cql(), compiled.bindings());
+ Object[][] objects = sut.execute(compiled.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, compiled.bindings());
List<ResultSetRow> result = new ArrayList<>();
for (Object[] obj : objects)
result.add(resultSetToRow(query.schemaSpec, clock, obj));
diff --git a/harry-core/src/harry/model/StatelessVisibleRowsChecker.java b/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
index 01f0381..8573b3b 100644
--- a/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
+++ b/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
@@ -22,11 +22,12 @@ import java.util.List;
import java.util.function.Supplier;
import harry.core.Configuration;
+import harry.core.Run;
import harry.data.ResultSetRow;
import harry.ddl.SchemaSpec;
import harry.model.sut.SystemUnderTest;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
import static harry.model.VisibleRowsChecker.descendingIterator;
@@ -43,29 +44,18 @@ public class StatelessVisibleRowsChecker implements Model
protected final SchemaSpec schema;
- public StatelessVisibleRowsChecker(SchemaSpec schema,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- OpSelectors.MonotonicClock clock,
- QuerySelector querySelector,
- SystemUnderTest sut)
+ public StatelessVisibleRowsChecker(Run run)
{
- this.pdSelector = pdSelector;
- this.descriptorSelector = descriptorSelector;
- this.schema = schema;
- this.clock = clock;
- this.sut = sut;
+ this.pdSelector = run.pdSelector;
+ this.descriptorSelector = run.descriptorSelector;
+ this.schema = run.schemaSpec;
+ this.clock = run.clock;
+ this.sut = run.sut;
}
- public void recordEvent(long lts, boolean quorumAchieved)
+ public void validate(Query query)
{
- //no-op
- }
-
- public void validatePartitionState(long validationLts, Query query)
- {
- validatePartitionState(validationLts,
- query,
+ validatePartitionState(query,
() -> SelectHelper.execute(sut, clock, query));
}
@@ -74,17 +64,17 @@ public class StatelessVisibleRowsChecker implements Model
throw new RuntimeException("not implemented");
}
- void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+ void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
{
// we ignore Query here, since our criteria for checking in this model is presence of the row in the resultset
- long pd = pdSelector.pd(validationLts, schema);
+ long pd = query.pd;
List<ResultSetRow> rows = rowsSupplier.get();
for (ResultSetRow row : rows)
{
VisibleRowsChecker.LongIterator rowLtsIter = descendingIterator(row.lts);
- VisibleRowsChecker.LongIterator modelLtsIter = descendingIterator(pdSelector, validationLts);
+ VisibleRowsChecker.LongIterator modelLtsIter = descendingIterator(pdSelector, pd);
outer:
while (rowLtsIter.hasNext())
diff --git a/harry-core/src/harry/model/VisibleRowsChecker.java b/harry-core/src/harry/model/VisibleRowsChecker.java
index ddda8b8..902bf07 100644
--- a/harry-core/src/harry/model/VisibleRowsChecker.java
+++ b/harry-core/src/harry/model/VisibleRowsChecker.java
@@ -24,18 +24,18 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import harry.core.Configuration;
+import harry.core.Run;
import harry.data.ResultSetRow;
import harry.ddl.SchemaSpec;
import harry.model.sut.SystemUnderTest;
+import harry.runner.DefaultDataTracker;
import harry.runner.Query;
-import harry.runner.QuerySelector;
/**
* A simple model to check whether or not the rows reported as visible by the database are reflected in
@@ -43,54 +43,86 @@ import harry.runner.QuerySelector;
*/
public class VisibleRowsChecker implements Model
{
- protected final Map<Long, TreeMap<Long, Event>> eventLog;
protected final OpSelectors.DescriptorSelector descriptorSelector;
protected final OpSelectors.PdSelector pdSelector;
protected final OpSelectors.MonotonicClock clock;
+ protected final LoggingDataTracker tracker;
protected final SystemUnderTest sut;
- protected final AtomicLong maxLts;
protected final SchemaSpec schema;
- public VisibleRowsChecker(SchemaSpec schema,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- OpSelectors.MonotonicClock clock,
- QuerySelector querySelector,
- SystemUnderTest sut)
+ public VisibleRowsChecker(Run run)
{
- this.pdSelector = pdSelector;
- this.descriptorSelector = descriptorSelector;
- this.eventLog = new HashMap<>();
- this.maxLts = new AtomicLong();
- this.schema = schema;
- this.clock = clock;
- this.sut = sut;
+ assert run.tracker instanceof LoggingDataTracker : "Visible rows checker requires a logging data tracker to run";
+ this.tracker = (LoggingDataTracker) run.tracker;
+ this.tracker.pdSelector = run.pdSelector;
+ this.pdSelector = run.pdSelector;
+ this.descriptorSelector = run.descriptorSelector;
+ this.schema = run.schemaSpec;
+ this.clock = run.clock;
+ this.sut = run.sut;
}
-
- public synchronized void recordEvent(long lts, boolean quorumAchieved)
+ public static class LoggingDataTracker extends DefaultDataTracker
{
- maxLts.updateAndGet((old) -> Math.max(old, lts));
- long pd = pdSelector.pd(lts);
+ protected final Map<Long, TreeMap<Long, Event>> eventLog = new HashMap<>();
+ private OpSelectors.PdSelector pdSelector;
- // TODO: This is definitely not optimal, but we probably use a better, potentially off-heap sorted structure for that anyways
- TreeMap<Long, Event> events = eventLog.get(pd);
- if (events == null)
+ public LoggingDataTracker()
{
- events = new TreeMap<>();
- eventLog.put(pd, events);
}
- Event event = events.get(lts);
- assert event == null || !event.quorumAchieved : "Operation should be partially visible before it is fully visible";
- events.put(lts, new Event(lts, quorumAchieved));
- }
+ public synchronized void started(long lts)
+ {
+ super.started(lts);
+ recordEvent(lts, false);
+ }
+
+ public synchronized void finished(long lts)
+ {
+ super.finished(lts);
+ recordEvent(lts, true);
+ }
+
+ public synchronized TreeMap<Long, Event> events(long pd)
+ {
+ TreeMap<Long, Event> log = eventLog.get(pd);
+ if (log == null)
+ return null;
+
+ return (TreeMap<Long, Event>) log.clone();
+ }
+
+ public long maxStarted()
+ {
+ return super.maxStarted();
+ }
+ public long maxConsecutiveFinished()
+ {
+ return super.maxConsecutiveFinished();
+ }
- public void validatePartitionState(long validationLts, Query query)
+ public void recordEvent(long lts, boolean finished)
+ {
+ long pd = pdSelector.pd(lts);
+
+ // TODO: This is definitely not optimal, but we probably use a better, potentially off-heap sorted structure for that anyways
+ TreeMap<Long, Event> events = eventLog.get(pd);
+ if (events == null)
+ {
+ events = new TreeMap<>();
+ eventLog.put(pd, events);
+ }
+
+ Event event = events.get(lts);
+ assert event == null || !event.quorumAchieved : "Operation should be partially visible before it is fully visible";
+ events.put(lts, new Event(lts, finished));
+ }
+ }
+
+ public void validate(Query query)
{
- validatePartitionState(validationLts,
- query,
+ validatePartitionState(query,
() -> SelectHelper.execute(sut, clock, query));
}
@@ -99,22 +131,23 @@ public class VisibleRowsChecker implements Model
throw new RuntimeException("not implemented");
}
- synchronized void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+ synchronized void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
{
// TODO: Right now, we ignore Query here!
- long pd = pdSelector.pd(validationLts, schema);
+ long pd = query.pd;
List<ResultSetRow> rows = rowsSupplier.get();
- TreeMap<Long, Event> events = eventLog.get(pd);
+ TreeMap<Long, Event> events = tracker.events(pd);
+
if (!rows.isEmpty() && (events == null || events.isEmpty()))
{
throw new ValidationException(String.format("Returned rows are not empty, but there were no records in the event log.\nRows: %s\nSeen pds: %s",
- rows, eventLog.keySet()));
+ rows, tracker.eventLog.keySet()));
}
for (ResultSetRow row : rows)
{
LongIterator rowLtsIter = descendingIterator(row.lts);
- PeekingIterator<Event> modelLtsIter = Iterators.peekingIterator(events.subMap(0L, true, maxLts.get(), true)
+ PeekingIterator<Event> modelLtsIter = Iterators.peekingIterator(events.subMap(0L, true, tracker.maxStarted(), true)
.descendingMap()
.values()
.iterator());
@@ -158,11 +191,11 @@ public class VisibleRowsChecker implements Model
}
- public static LongIterator descendingIterator(OpSelectors.PdSelector pdSelector, long verificationLts)
+ public static LongIterator descendingIterator(OpSelectors.PdSelector pdSelector, long pd)
{
return new VisibleRowsChecker.LongIterator()
{
- long next = pdSelector.maxLts(verificationLts);
+ long next = pdSelector.maxLtsFor(pd);
public long nextLong()
{
diff --git a/harry-core/src/harry/model/sut/PrintlnSut.java b/harry-core/src/harry/model/sut/PrintlnSut.java
index 463d780..ad14b0b 100644
--- a/harry-core/src/harry/model/sut/PrintlnSut.java
+++ b/harry-core/src/harry/model/sut/PrintlnSut.java
@@ -32,17 +32,18 @@ public class PrintlnSut implements SystemUnderTest
{
}
- public Object[][] execute(String statement, Object... bindings)
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
{
- System.out.println(String.format("%s | %s",
+ System.out.println(String.format("%s | %s | %s",
statement,
+ cl,
Arrays.toString(bindings)));
return new Object[0][];
}
- public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
{
- return CompletableFuture.supplyAsync(() -> execute(statement, bindings),
+ return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings),
Runnable::run);
}
}
diff --git a/harry-core/src/harry/model/sut/SystemUnderTest.java b/harry-core/src/harry/model/sut/SystemUnderTest.java
index 9e5e0a9..ec0a48e 100644
--- a/harry-core/src/harry/model/sut/SystemUnderTest.java
+++ b/harry-core/src/harry/model/sut/SystemUnderTest.java
@@ -35,20 +35,51 @@ public interface SystemUnderTest
default void schemaChange(String statement)
{
- execute(statement, new Object[]{});
+ execute(statement, ConsistencyLevel.ALL, new Object[]{});
}
- default Object[][] execute(CompiledStatement statement)
+ default Object[][] execute(CompiledStatement statement, ConsistencyLevel cl)
{
- return execute(statement.cql(), statement.bindings());
+ return execute(statement.cql(), cl, statement.bindings());
}
- Object[][] execute(String statement, Object... bindings);
+ Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings);
- CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings);
+ CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings);
interface SystemUnderTestFactory
{
SystemUnderTest create();
}
+
+ enum ConsistencyLevel {
+ ALL, QUORUM, NODE_LOCAL
+ }
+
+ public static final SystemUnderTest NO_OP = new NoOpSut();
+
+ public class NoOpSut implements SystemUnderTest
+ {
+ private NoOpSut() {}
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ public void shutdown()
+ {
+ }
+
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return new Object[0][];
+ }
+
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings),
+ Runnable::run);
+ }
+ }
+
}
\ No newline at end of file
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index 50b03c6..cab8fb8 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -24,14 +24,16 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
+import java.util.Set;
import java.util.TreeMap;
import harry.ddl.SchemaSpec;
+import harry.model.ExhaustiveChecker;
import harry.model.OpSelectors;
import harry.runner.AbstractPartitionVisitor;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
import harry.util.BitSet;
import harry.util.Ranges;
@@ -49,36 +51,42 @@ public class Reconciler
{
private final OpSelectors.DescriptorSelector descriptorSelector;
private final OpSelectors.PdSelector pdSelector;
- private final QuerySelector querySelector;
+ private final QueryGenerator rangeSelector;
private final SchemaSpec schema;
public Reconciler(SchemaSpec schema,
OpSelectors.PdSelector pdSelector,
OpSelectors.DescriptorSelector descriptorSelector,
- QuerySelector querySelector)
+ QueryGenerator rangeSelector)
{
this.descriptorSelector = descriptorSelector;
this.pdSelector = pdSelector;
this.schema = schema;
- this.querySelector = querySelector;
+ this.rangeSelector = rangeSelector;
}
public PartitionState inflatePartitionState(final long pd, long maxLts, Query query)
{
List<Ranges.Range> ranges = new ArrayList<>();
+
// TODO: we should think of a single-pass algorithm that would allow us to inflate all deletes and range deletes for a partition
PartitionVisitor partitionVisitor = new AbstractPartitionVisitor(pdSelector, descriptorSelector, schema)
{
public void operation(long lts, long pd, long cd, long m, long opId)
{
- if (!query.match(cd))
- return;
-
OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
if (opType == OpSelectors.OperationKind.DELETE_RANGE)
- ranges.add(querySelector.inflate(lts, opId).toRange(lts));
+ {
+ ranges.add(rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toRange(lts));
+ }
+ else if (opType == OpSelectors.OperationKind.DELETE_SLICE)
+ {
+ ranges.add(rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toRange(lts));
+ }
else if (opType == OpSelectors.OperationKind.DELETE_ROW)
+ {
ranges.add(new Ranges.Range(cd, cd, true, true, lts));
+ }
}
};
@@ -90,21 +98,27 @@ public class Reconciler
currentLts = pdSelector.nextLts(currentLts);
}
- // We have to do two passes to avoid inflating deleted items
- Ranges rts = new Ranges(ranges);
-
PartitionState partitionState = new PartitionState();
partitionVisitor = new AbstractPartitionVisitor(pdSelector, descriptorSelector, schema)
{
public void operation(long lts, long pd, long cd, long m, long opId)
{
+ if (!query.match(cd))
+ return;
+
OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
- if (opType == OpSelectors.OperationKind.DELETE_ROW || opType == OpSelectors.OperationKind.DELETE_RANGE)
+ if (opType == OpSelectors.OperationKind.DELETE_ROW
+ || opType == OpSelectors.OperationKind.DELETE_RANGE
+ || opType == OpSelectors.OperationKind.DELETE_SLICE)
return;
- if (!query.match(cd) || rts.isShadowed(cd, lts))
- return;
+ // TODO: avoid linear scan
+ for (Ranges.Range range : ranges)
+ {
+ if (range.timestamp >= lts && range.contains(cd))
+ return;
+ }
if (opType == OpSelectors.OperationKind.WRITE)
{
@@ -167,7 +181,6 @@ public class Reconciler
}
}
-
state = new RowState(cd, vdsCopy, ltss);
rows.put(cd, state);
}
diff --git a/harry-core/src/harry/runner/AbstractPartitionVisitor.java b/harry-core/src/harry/runner/AbstractPartitionVisitor.java
index aefa79f..a3ebd33 100644
--- a/harry-core/src/harry/runner/AbstractPartitionVisitor.java
+++ b/harry-core/src/harry/runner/AbstractPartitionVisitor.java
@@ -91,13 +91,14 @@ public abstract class AbstractPartitionVisitor implements PartitionVisitor
protected void operation(long lts, long pd, long cd, long m, long opId)
{
+
}
protected void afterBatch(long lts, long pd, long m)
{
}
- public void shutdown()
+ public void shutdown() throws InterruptedException
{
}
}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/AllPartitionsValidator.java b/harry-core/src/harry/runner/AllPartitionsValidator.java
new file mode 100644
index 0000000..674ab5e
--- /dev/null
+++ b/harry-core/src/harry/runner/AllPartitionsValidator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.MetricReporter;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.Model;
+import harry.model.OpSelectors;
+
+// This might be something that potentially grows into the validator described in the design doc;
+// right now it's just a helper/container class
+public class AllPartitionsValidator implements PartitionVisitor
+{
+ private static final Logger logger = LoggerFactory.getLogger(AllPartitionsValidator.class);
+
+ protected final Model model;
+ protected final SchemaSpec schema;
+
+ protected final OpSelectors.MonotonicClock clock;
+ protected final OpSelectors.PdSelector pdSelector;
+ protected final MetricReporter metricReporter;
+ protected final ExecutorService executor;
+ protected final int concurrency;
+ protected final int triggerAfter;
+
+ public AllPartitionsValidator(int concurrency,
+ int triggerAfter,
+ Run run,
+ Model.ModelFactory modelFactory)
+ {
+ this.triggerAfter = triggerAfter;
+ this.metricReporter = run.metricReporter;
+ this.model = modelFactory.make(run);
+ this.schema = run.schemaSpec;
+ this.clock = run.clock;
+ this.pdSelector = run.pdSelector;
+ this.concurrency = concurrency;
+ this.executor = Executors.newFixedThreadPool(concurrency);
+ }
+
+ protected CompletableFuture<Void> validateAllPartitions(ExecutorService executor, int parallelism)
+ {
+ long maxLts = clock.maxLts() - 1;
+ long maxPos = pdSelector.positionFor(maxLts);
+ AtomicLong counter = new AtomicLong();
+ CompletableFuture[] futures = new CompletableFuture[parallelism];
+ for (int i = 0; i < parallelism; i++)
+ {
+ futures[i] = CompletableFuture.supplyAsync(() -> {
+ long pos;
+ while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted())
+ {
+ if (pos > 0 && pos % 1000 == 0)
+ logger.debug(String.format("Validated %d out of %d partitions", pos, maxPos));
+ long visitLts = pdSelector.minLtsAt(pos);
+
+ metricReporter.validatePartition();
+ for (boolean reverse : new boolean[]{ true, false })
+ {
+ model.validate(Query.selectPartition(schema, pdSelector.pd(visitLts, schema), reverse));
+ }
+ }
+ return null;
+ }, executor);
+ }
+ return CompletableFuture.allOf(futures);
+ }
+
+ public void visitPartition(long lts)
+ {
+ if (lts % triggerAfter == 0)
+ validateAllPartitions(executor, concurrency);
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+}
diff --git a/harry-integration/src/harry/runner/HarryRunnerJvm.java b/harry-core/src/harry/runner/DataTracker.java
similarity index 55%
rename from harry-integration/src/harry/runner/HarryRunnerJvm.java
rename to harry-core/src/harry/runner/DataTracker.java
index 38bf264..d5825fb 100644
--- a/harry-integration/src/harry/runner/HarryRunnerJvm.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -19,21 +19,34 @@
package harry.runner;
import harry.core.Configuration;
-import harry.model.sut.InJvmSut;
-import java.io.File;
+public interface DataTracker
+{
+ void started(long lts);
+ void finished(long lts);
-public class HarryRunnerJvm extends org.apache.cassandra.distributed.test.TestBaseImpl implements HarryRunner {
+ long maxStarted();
+ long maxConsecutiveFinished();
- public static void main(String[] args) throws Throwable {
- InJvmSut.registerSubtypes();
+ public Configuration.DataTrackerConfiguration toConfig();
- HarryRunnerJvm runner = new HarryRunnerJvm();
- File configFile = runner.loadConfig(args);
-
- Configuration configuration = Configuration.fromFile(configFile);
- runner.run(configuration);
+ interface DataTrackerFactory {
+ DataTracker make();
}
+ public static DataTracker NO_OP = new NoOpDataTracker();
+ class NoOpDataTracker implements DataTracker
+ {
+ private NoOpDataTracker() {}
+
+ public void started(long lts) {}
+ public void finished(long lts) {}
+ public long maxStarted() { return 0; }
+ public long maxConsecutiveFinished() { return 0; }
+ public Configuration.DataTrackerConfiguration toConfig()
+ {
+ return null;
+ }
+ }
}
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
new file mode 100644
index 0000000..1b55482
--- /dev/null
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Configuration;
+
+public class DefaultDataTracker implements DataTracker
+{
+ private static final Logger logger = LoggerFactory.getLogger(DefaultDataTracker.class);
+
+ private final AtomicLong maxSeenLts;
+ // TODO: This is a trivial implementation that can be significantly improved upon
+ // for example, we could use a bitmap that records `1`s for all lts that are after
+ // the consective, and "collapse" the bitmap state into the long as soon as we see
+ // consecutive `1` on the left side.
+ private final AtomicLong maxCompleteLts;
+ private final PriorityBlockingQueue<Long> reorderBuffer;
+
+ public DefaultDataTracker()
+ {
+ this.maxSeenLts = new AtomicLong(-1);
+ this.maxCompleteLts = new AtomicLong(-1);
+ this.reorderBuffer = new PriorityBlockingQueue<>(100);
+ }
+
+ // TODO: there's also some room for improvement in terms of concurrency
+ // TODO: remove pd?
+ public void started(long lts)
+ {
+ recordEvent(lts, false);
+ }
+
+ public void finished(long lts)
+ {
+ recordEvent(lts, true);
+ }
+
+ private void recordEvent(long lts, boolean finished)
+ {
+ // all seen LTS are allowed to be "in-flight"
+ maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
+
+ if (!finished)
+ return;
+
+ long maxAchievedConsecutive = drainReorderQueue();
+
+ if (maxAchievedConsecutive + 1 == lts)
+ maxCompleteLts.compareAndSet(maxAchievedConsecutive, lts);
+ else
+ reorderBuffer.offer(lts);
+ }
+
+ public long drainReorderQueue()
+ {
+ final long expected = maxCompleteLts.get();
+ long maxAchievedConsecutive = expected;
+ if (reorderBuffer.isEmpty())
+ return maxAchievedConsecutive;
+
+ boolean catchingUp = false;
+
+ Long smallest = reorderBuffer.poll();
+ while (smallest != null && smallest == maxAchievedConsecutive + 1)
+ {
+ maxAchievedConsecutive++;
+ catchingUp = true;
+ smallest = reorderBuffer.poll();
+ }
+
+ // put back
+ if (smallest != null)
+ reorderBuffer.offer(smallest);
+
+ if (catchingUp)
+ maxCompleteLts.compareAndSet(expected, maxAchievedConsecutive);
+
+ int bufferSize = reorderBuffer.size();
+ if (bufferSize > 100)
+ logger.warn("Reorder buffer size has grown up to " + reorderBuffer.size());
+ return maxAchievedConsecutive;
+ }
+
+ public long maxStarted()
+ {
+ return maxSeenLts.get();
+ }
+
+ public long maxConsecutiveFinished()
+ {
+ return maxCompleteLts.get();
+ }
+
+ public Configuration.DataTrackerConfiguration toConfig()
+ {
+ return new Configuration.DefaultDataTrackerConfiguration(maxSeenLts.get(), maxCompleteLts.get());
+ }
+
+ @VisibleForTesting
+ public void forceLts(long maxSeen, long maxComplete)
+ {
+ this.maxSeenLts.set(maxSeen);
+ this.maxCompleteLts.set(maxComplete);
+ }
+
+ public String toString()
+ {
+ List<Long> buf = new ArrayList<>(reorderBuffer);
+ return "DataTracker{" +
+ "maxSeenLts=" + maxSeenLts +
+ ", maxCompleteLts=" + maxCompleteLts +
+ ", reorderBuffer=" + buf +
+ '}';
+ }
+}
diff --git a/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java b/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java
deleted file mode 100644
index 89873fd..0000000
--- a/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package harry.runner;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.ddl.SchemaSpec;
-import harry.model.Model;
-import harry.model.OpSelectors;
-import harry.model.sut.SystemUnderTest;
-import harry.operations.CompiledStatement;
-
-public class DefaultPartitionVisitorFactory implements Supplier<PartitionVisitor>
-{
- private static final Logger logger = LoggerFactory.getLogger(DefaultPartitionVisitorFactory.class);
-
- private final Model model;
- private final SystemUnderTest sut;
- private final RowVisitor rowVisitor;
- private final BufferedWriter operationLog;
-
- // TODO: shutdown properly
- private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
- private final Supplier<DefaultPartitionVisitor> factory;
-
- public DefaultPartitionVisitorFactory(Model model,
- SystemUnderTest sut,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- SchemaSpec schema,
- RowVisitor rowVisitor)
- {
- this.model = model;
- this.sut = sut;
- this.rowVisitor = rowVisitor;
- this.factory = () -> new DefaultPartitionVisitor(pdSelector, descriptorSelector, schema);
-
- File f = new File("operation.log");
- try
- {
- operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
- }
- catch (FileNotFoundException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public PartitionVisitor get()
- {
- return factory.get();
- }
-
- private class DefaultPartitionVisitor extends AbstractPartitionVisitor
- {
- private List<String> statements = new ArrayList<>();
- private List<Object> bindings = new ArrayList<>();
- private List<CompletableFuture> futures = new ArrayList<>();
-
- public DefaultPartitionVisitor(OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- SchemaSpec schema)
- {
- super(pdSelector, descriptorSelector, schema);
- }
-
- public void beforeLts(long lts, long pd)
- {
- model.recordEvent(lts, false);
- }
-
- public void afterLts(long lts, long pd)
- {
- for (CompletableFuture future : futures)
- {
- try
- {
- // TODO: currently, Cassandra keeps timing out; we definitely need to investigate that, but we need to focus on other things first
- future.get();
- }
- catch (Throwable t)
- {
- throw new Model.ValidationException("Couldn't repeat operations within timeout bounds.", t);
- }
- }
-
- log("LTS: %d. Pd %d. Finished\n", lts, pd);
- model.recordEvent(lts, true);
- }
-
- public void beforeBatch(long lts, long pd, long m)
- {
- statements = new ArrayList<>();
- bindings = new ArrayList<>();
- }
-
- public void operation(long lts, long pd, long cd, long m, long opId)
- {
- OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
- CompiledStatement statement = rowVisitor.visitRow(op, lts, pd, cd, opId);
-
- log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
- lts, pd, cd, m, opId, statement));
-
- statements.add(statement.cql());
- bindings.addAll(Arrays.asList(statement.bindings()));
- }
-
- public void afterBatch(long lts, long pd, long m)
- {
- String query = String.join(" ", statements);
-
- if (statements.size() > 1)
- query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
-
- Object[] bindingsArray = new Object[bindings.size()];
- bindings.toArray(bindingsArray);
-
- CompletableFuture<Object[][]> future = new CompletableFuture<>();
- executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
- futures.add(future);
-
- statements = null;
- bindings = null;
- }
- }
-
- void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
- {
- if (sut.isShutdown())
- throw new IllegalStateException("System under test is shut down");
-
- sut.executeAsync(statement.cql(), statement.bindings())
- .whenComplete((res, t) -> {
- if (t != null)
- executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
- else
- future.complete(res);
- });
- }
-
- private void log(String format, Object... objects)
- {
- try
- {
- operationLog.write(String.format(format, objects));
- }
- catch (IOException e)
- {
- // ignore
- }
- }
-
- public void shutdown()
- {
- executor.shutdown();
- try
- {
- executor.awaitTermination(30, TimeUnit.SECONDS);
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- }
-}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/HarryRunner.java b/harry-core/src/harry/runner/HarryRunner.java
index 56e2350..77dcc8b 100644
--- a/harry-core/src/harry/runner/HarryRunner.java
+++ b/harry-core/src/harry/runner/HarryRunner.java
@@ -33,24 +33,40 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public interface HarryRunner
+public abstract class HarryRunner
{
+ public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
- Logger logger = LoggerFactory.getLogger(HarryRunner.class);
+ protected CompletableFuture progress;
+ protected ScheduledThreadPoolExecutor executor;
+ public abstract void beforeRun(Runner runner);
+ public void afterRun(Runner runner, Object result)
+ {
+ executor.shutdown();
+ try
+ {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
- default void run(Configuration configuration) throws Throwable
+ public void run(Configuration config) throws Throwable
{
System.setProperty("cassandra.disable_tcactive_openssl", "true");
System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ executor = new ScheduledThreadPoolExecutor(1);
executor.setRemoveOnCancelPolicy(true);
- Runner runner = configuration.createRunner();
+ Runner runner = config.createRunner();
Run run = runner.getRun();
- CompletableFuture progress = runner.initAndStartAll();
+ progress = runner.initAndStartAll();
+ beforeRun(runner);
// Uncomment this if you want to have fun!
// scheduleCorruption(run, executor);
@@ -63,7 +79,7 @@ public interface HarryRunner
if (b != null)
return b;
return a;
- }).get(run.snapshot.run_time_unit.toSeconds(run.snapshot.run_time) + 30,
+ }).get(config.run_time_unit.toSeconds(config.run_time) + 30,
TimeUnit.SECONDS);
if (result instanceof Throwable)
logger.error("Execution failed", result);
@@ -77,6 +93,8 @@ public interface HarryRunner
}
finally
{
+ afterRun(runner, result);
+
logger.info("Shutting down executor..");
tryRun(() -> {
executor.shutdownNow();
@@ -99,7 +117,7 @@ public interface HarryRunner
}
}
- default void tryRun(ThrowingRunnable runnable)
+ public void tryRun(ThrowingRunnable runnable)
{
try
{
@@ -117,7 +135,7 @@ public interface HarryRunner
* @return Configuration YAML file.
* @throws Exception If file is not found or cannot be read.
*/
- default File loadConfig(String[] args) throws Exception {
+ public File loadConfig(String[] args) throws Exception {
if (args == null || args.length == 0) {
throw new Exception("Harry config YAML not provided.");
}
diff --git a/harry-core/src/harry/runner/LoggingPartitionVisitor.java b/harry-core/src/harry/runner/LoggingPartitionVisitor.java
new file mode 100644
index 0000000..c3a77af
--- /dev/null
+++ b/harry-core/src/harry/runner/LoggingPartitionVisitor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import harry.core.Run;
+import harry.operations.CompiledStatement;
+
+public class LoggingPartitionVisitor extends MutatingPartitionVisitor
+{
+ private final BufferedWriter operationLog;
+
+ public LoggingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+ {
+ super(run, rowVisitorFactory);
+
+ File f = new File("operation.log");
+ try
+ {
+ operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void afterLts(long lts, long pd)
+ {
+ super.afterLts(lts, pd);
+ log("LTS: %d. Pd %d. Finished\n", lts, pd);
+ }
+
+ @Override
+ protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
+ {
+ CompiledStatement statement = super.operationInternal(lts, pd, cd, m, opId);
+
+ log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
+ lts, pd, cd, m, opId, statement));
+
+ return statement;
+ }
+
+ public static String bindingsToString(Object... bindings)
+ {
+ StringBuilder sb = new StringBuilder();
+ boolean isFirst = true;
+ for (Object binding : bindings)
+ {
+ if (isFirst)
+ isFirst = false;
+ else
+ sb.append(",");
+
+ if (binding instanceof String)
+ sb.append("\"").append(binding).append("\"");
+ else if (binding instanceof Long)
+ sb.append(binding).append("L");
+ else
+ sb.append(binding);
+ }
+ return sb.toString();
+ }
+
+ private void log(String format, Object... objects)
+ {
+ try
+ {
+ operationLog.write(String.format(format, objects));
+ operationLog.flush();
+ }
+ catch (IOException e)
+ {
+ // ignore
+ }
+ }
+}
diff --git a/harry-core/src/harry/runner/MutatingPartitionVisitor.java b/harry-core/src/harry/runner/MutatingPartitionVisitor.java
new file mode 100644
index 0000000..a818091
--- /dev/null
+++ b/harry-core/src/harry/runner/MutatingPartitionVisitor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.OpSelectors;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class MutatingPartitionVisitor extends AbstractPartitionVisitor
+{
+ private final List<String> statements = new ArrayList<>();
+ private final List<Object> bindings = new ArrayList<>();
+
+ private final List<CompletableFuture<?>> futures = new ArrayList<>();
+
+ protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
+ protected final DataTracker tracker;
+ protected final SystemUnderTest sut;
+ protected final RowVisitor rowVisitor;
+
+ public MutatingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+ {
+ super(run.pdSelector, run.descriptorSelector, run.schemaSpec);
+ this.tracker = run.tracker;
+ this.sut = run.sut;
+ this.rowVisitor = rowVisitorFactory.make(run);
+ }
+
+ public void beforeLts(long lts, long pd)
+ {
+ tracker.started(lts);
+ }
+
+ public void afterLts(long lts, long pd)
+ {
+ for (CompletableFuture<?> future : futures)
+ {
+ try
+ {
+ future.get();
+ }
+ catch (Throwable t)
+ {
+ throw new Model.ValidationException("Couldn't repeat operations within timeout bounds.", t);
+ }
+ }
+ futures.clear();
+ tracker.finished(lts);
+ }
+
+ public void beforeBatch(long lts, long pd, long m)
+ {
+ statements.clear();
+ bindings.clear();
+ }
+
+ protected void operation(long lts, long pd, long cd, long m, long opId)
+ {
+ CompiledStatement statement = operationInternal(lts, pd, cd, m, opId);
+ statements.add(statement.cql());
+ for (Object binding : statement.bindings())
+ bindings.add(binding);
+ }
+
+ protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
+ {
+ OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
+ return rowVisitor.visitRow(op, lts, pd, cd, opId);
+ }
+
+ public void afterBatch(long lts, long pd, long m)
+ {
+ String query = String.join(" ", statements);
+
+ if (statements.size() > 1)
+ query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
+
+ Object[] bindingsArray = new Object[bindings.size()];
+ bindings.toArray(bindingsArray);
+
+ CompletableFuture<Object[][]> future = new CompletableFuture<>();
+ executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
+ futures.add(future);
+
+ statements.clear();
+ bindings.clear();
+ }
+
+ void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
+ {
+ if (sut.isShutdown())
+ throw new IllegalStateException("System under test is shut down");
+
+ sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
+ .whenComplete((res, t) -> {
+ if (t != null)
+ executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
+ else
+ future.complete(res);
+ });
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ }
+}
diff --git a/harry-core/src/harry/runner/DefaultRowVisitor.java b/harry-core/src/harry/runner/MutatingRowVisitor.java
similarity index 61%
rename from harry-core/src/harry/runner/DefaultRowVisitor.java
rename to harry-core/src/harry/runner/MutatingRowVisitor.java
index 1968192..01b5acd 100644
--- a/harry-core/src/harry/runner/DefaultRowVisitor.java
+++ b/harry-core/src/harry/runner/MutatingRowVisitor.java
@@ -18,6 +18,8 @@
package harry.runner;
+import harry.core.MetricReporter;
+import harry.core.Run;
import harry.ddl.SchemaSpec;
import harry.model.OpSelectors;
import harry.operations.CompiledStatement;
@@ -25,32 +27,33 @@ import harry.operations.DeleteHelper;
import harry.operations.WriteHelper;
import harry.util.BitSet;
-public class DefaultRowVisitor implements RowVisitor
+public class MutatingRowVisitor implements RowVisitor
{
- private final SchemaSpec schema;
- private final OpSelectors.MonotonicClock clock;
- private final OpSelectors.DescriptorSelector descriptorSelector;
- private final QuerySelector querySelector;
+ protected final SchemaSpec schema;
+ protected final OpSelectors.MonotonicClock clock;
+ protected final OpSelectors.DescriptorSelector descriptorSelector;
+ protected final QueryGenerator rangeSelector;
+ protected final MetricReporter metricReporter;
- public DefaultRowVisitor(SchemaSpec schema,
- OpSelectors.MonotonicClock clock,
- OpSelectors.DescriptorSelector descriptorSelector,
- QuerySelector querySelector)
+ public MutatingRowVisitor(Run run)
{
- this.schema = schema;
- this.clock = clock;
- this.descriptorSelector = descriptorSelector;
- this.querySelector = querySelector;
+ this.metricReporter = run.metricReporter;
+ this.schema = run.schemaSpec;
+ this.clock = run.clock;
+ this.descriptorSelector = run.descriptorSelector;
+ this.rangeSelector = run.rangeSelector;
}
public CompiledStatement write(long lts, long pd, long cd, long opId)
{
+ metricReporter.insert();
long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
return WriteHelper.inflateInsert(schema, pd, cd, vds, clock.rts(lts));
}
public CompiledStatement deleteColumn(long lts, long pd, long cd, long opId)
{
+ metricReporter.columnDelete();
BitSet mask = descriptorSelector.columnMask(pd, lts, opId);
return DeleteHelper.deleteColumn(schema, pd, cd, mask, clock.rts(lts));
}
@@ -58,11 +61,19 @@ public class DefaultRowVisitor implements RowVisitor
public CompiledStatement deleteRow(long lts, long pd, long cd, long opId)
{
+ metricReporter.rowDelete();
return DeleteHelper.deleteRow(schema, pd, cd, clock.rts(lts));
}
public CompiledStatement deleteRange(long lts, long pd, long opId)
{
- return querySelector.inflate(lts, opId).toDeleteStatement(clock.rts(lts));
+ metricReporter.rangeDelete();
+ return rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toDeleteStatement(clock.rts(lts));
+ }
+
+ public CompiledStatement deleteSlice(long lts, long pd, long opId)
+ {
+ metricReporter.rangeDelete();
+ return rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toDeleteStatement(clock.rts(lts));
}
}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/PartitionVisitor.java b/harry-core/src/harry/runner/PartitionVisitor.java
index ef861fd..85e3b26 100644
--- a/harry-core/src/harry/runner/PartitionVisitor.java
+++ b/harry-core/src/harry/runner/PartitionVisitor.java
@@ -18,9 +18,14 @@
package harry.runner;
+import harry.core.Run;
+
public interface PartitionVisitor
{
void visitPartition(long lts);
-
- void shutdown();
-}
+ public void shutdown() throws InterruptedException;
+ public interface PartitionVisitorFactory
+ {
+ public PartitionVisitor make(Run run);
+ }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/QueryGenerator.java b/harry-core/src/harry/runner/QueryGenerator.java
new file mode 100644
index 0000000..a50c450
--- /dev/null
+++ b/harry-core/src/harry/runner/QueryGenerator.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.LongSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Run;
+import harry.ddl.ColumnSpec;
+import harry.ddl.SchemaSpec;
+import harry.generators.DataGenerators;
+import harry.generators.RngUtils;
+import harry.generators.Surjections;
+import harry.model.OpSelectors;
+import harry.operations.Relation;
+
+// TODO: there's a lot of potential to reduce an amount of garbage here.
+// TODO: refactor. Currently, this class is a base for both SELECT and DELETE statements. In retrospect,
+// a better way to do the same thing would've been to just inflate bounds, be able to inflate
+// any type of query from the bounds, and leave things like "reverse" up to the last mile / implementation.
+public class QueryGenerator
+{
+ private static final Logger logger = LoggerFactory.getLogger(QueryGenerator.class);
+ private static final long GT_STREAM = 0b1;
+ private static final long E_STREAM = 0b10;
+
+ private final OpSelectors.Rng rng;
+ private final OpSelectors.PdSelector pdSelector;
+ private final OpSelectors.DescriptorSelector descriptorSelector;
+ private final SchemaSpec schema;
+
+ public QueryGenerator(Run run)
+ {
+ this(run.schemaSpec, run.pdSelector, run.descriptorSelector, run.rng);
+ }
+
+ // TODO: remove constructor
+ public QueryGenerator(SchemaSpec schema,
+ OpSelectors.PdSelector pdSelector,
+ OpSelectors.DescriptorSelector descriptorSelector,
+ OpSelectors.Rng rng)
+ {
+ this.pdSelector = pdSelector;
+ this.descriptorSelector = descriptorSelector;
+ this.schema = schema;
+ this.rng = rng;
+ }
+
+ public static class TypedQueryGenerator
+ {
+ private final OpSelectors.Rng rng;
+ private final QueryGenerator queryGenerator;
+ private final Surjections.Surjection<Query.QueryKind> queryKindGen;
+
+ public TypedQueryGenerator(Run run)
+ {
+ this(run.rng, new QueryGenerator(run));
+ }
+
+ public TypedQueryGenerator(OpSelectors.Rng rng,
+ QueryGenerator queryGenerator)
+ {
+ this(rng, Surjections.enumValues(Query.QueryKind.class), queryGenerator);
+ }
+
+ public TypedQueryGenerator(OpSelectors.Rng rng,
+ Surjections.Surjection<Query.QueryKind> queryKindGen,
+ QueryGenerator queryGenerator)
+ {
+ this.rng = rng;
+ this.queryGenerator = queryGenerator;
+ this.queryKindGen = queryKindGen;
+ }
+
+ // Queries are inflated from LTS, which identifies the partition, and i, a modifier for the query to
+ // be able to generate different queries for the same lts.
+ public Query inflate(long lts, long modifier)
+ {
+ long descriptor = rng.next(modifier, lts);
+ Query.QueryKind queryKind = queryKindGen.inflate(descriptor);
+ return queryGenerator.inflate(lts, modifier, queryKind);
+ }
+ }
+
+ public Query inflate(long lts, long modifier, Query.QueryKind queryKind)
+ {
+ long pd = pdSelector.pd(lts, schema);
+ long descriptor = rng.next(modifier, lts);
+ boolean reverse = descriptor % 2 == 0;
+ switch (queryKind)
+ {
+ case SINGLE_PARTITION:
+ return new Query.SinglePartitionQuery(queryKind,
+ pd,
+ reverse,
+ Collections.emptyList(),
+ schema);
+ case SINGLE_CLUSTERING:
+ {
+ long cd = descriptorSelector.randomCd(pd, descriptor, schema);
+ return new Query.SingleClusteringQuery(queryKind,
+ pd,
+ cd,
+ reverse,
+ Relation.eqRelations(schema.ckGenerator.slice(cd), schema.clusteringKeys),
+ schema);
+ }
+ case CLUSTERING_SLICE:
+ {
+ List<Relation> relations = new ArrayList<>();
+ long cd = descriptorSelector.randomCd(pd, descriptor, schema);
+ boolean isGt = RngUtils.asBoolean(rng.next(descriptor, GT_STREAM));
+ // TODO: make generation of EQ configurable; turn it off and on
+ boolean isEquals = RngUtils.asBoolean(rng.next(descriptor, E_STREAM));
+
+ long[] sliced = schema.ckGenerator.slice(cd);
+ long min;
+ long max;
+ int nonEqFrom = RngUtils.asInt(descriptor, 0, sliced.length - 1);
+
+ long[] minBound = new long[sliced.length];
+ long[] maxBound = new long[sliced.length];
+
+ // Algorithm that determines boundaries for a clustering slice.
+ //
+ // Basic principles are not hard but there are a few edge cases. I haven't figured out how to simplify
+ // those, so there might be some room for improvement. In short, what we want to achieve is:
+ //
+ // 1. Every part that is restricted with an EQ relation goes into the bound verbatim.
+ // 2. Every part that is restricted with a non-EQ relation (LT, GT, LTE, GTE) is taken into the bound
+ // if it is required to satisfy the relationship. For example, in `ck1 = 0 AND ck2 < 5`, ck2 will go
+ // to the _max_ boundary, and minimum value will go to the _min_ boundary, since we can select every
+ // descriptor that is prefixed with ck1.
+ // 3. Every other part (e.g., ones that are not explicitly mentioned in the query) has to be restricted
+ // according to equality. For example, in `ck1 = 0 AND ck2 < 5`, ck3 that is present in schema but not
+ // mentioned in query, makes sure that any value between [0, min_value, min_value] and [0, 5, min_value]
+ // is matched.
+ //
+ // One edge case is a query on the first clustering key: `ck1 < 5`. In this case, we have to fixup the lower
+ // value to the minimum possible value. We could really just do Long.MIN_VALUE, but in case we forget to
+ // adjust entropy elsewhere, it'll be caught correctly here.
+ for (int i = 0; i < sliced.length; i++)
+ {
+ long v = sliced[i];
+ DataGenerators.KeyGenerator gen = schema.ckGenerator;
+ ColumnSpec column = schema.clusteringKeys.get(i);
+ int idx = i;
+ LongSupplier maxSupplier = () -> gen.maxValue(idx);
+ LongSupplier minSupplier = () -> gen.minValue(idx);
+
+ if (i < nonEqFrom)
+ {
+ relations.add(Relation.eqRelation(schema.clusteringKeys.get(i), v));
+ minBound[i] = v;
+ maxBound[i] = v;
+ }
+ else if (i == nonEqFrom)
+ {
+ relations.add(Relation.relation(relationKind(isGt, isEquals), schema.clusteringKeys.get(i), v));
+
+ if (column.isReversed())
+ {
+ minBound[i] = isGt ? minSupplier.getAsLong() : v;
+ maxBound[i] = isGt ? v : maxSupplier.getAsLong();
+ }
+ else
+ {
+ minBound[i] = isGt ? v : minSupplier.getAsLong();
+ maxBound[i] = isGt ? maxSupplier.getAsLong() : v;
+ }
+ }
+ else
+ {
+ if (isEquals)
+ {
+ minBound[i] = minSupplier.getAsLong();
+ maxBound[i] = maxSupplier.getAsLong();
+ }
+ else if (i > 0 && schema.clusteringKeys.get(i - 1).isReversed())
+ maxBound[i] = minBound[i] = isGt ? minSupplier.getAsLong() : maxSupplier.getAsLong();
+ else
+ maxBound[i] = minBound[i] = isGt ? maxSupplier.getAsLong() : minSupplier.getAsLong();
+ }
+ }
+
+ if (schema.clusteringKeys.get(nonEqFrom).isReversed())
+ isGt = !isGt;
+
+ min = schema.ckGenerator.stitch(minBound);
+ max = schema.ckGenerator.stitch(maxBound);
+
+ if (nonEqFrom == 0)
+ {
+ min = isGt ? min : schema.ckGenerator.minValue();
+ max = !isGt ? max : schema.ckGenerator.maxValue();
+ }
+
+ // if we're about to create an "impossible" query, just bump the modifier and re-generate
+ if (min == max && !isEquals)
+ return inflate(lts, modifier + 1, queryKind);
+
+ return new Query.ClusteringSliceQuery(Query.QueryKind.CLUSTERING_SLICE,
+ pd,
+ min,
+ max,
+ relationKind(true, isGt ? isEquals : true),
+ relationKind(false, !isGt ? isEquals : true),
+ reverse,
+ relations,
+ schema);
+ }
+ case CLUSTERING_RANGE:
+ {
+ List<Relation> relations = new ArrayList<>();
+ long cd1 = descriptorSelector.randomCd(pd, descriptor, schema);
+ boolean isMinEq = RngUtils.asBoolean(descriptor);
+ long cd2 = descriptorSelector.randomCd(pd, rng.next(descriptor, lts), schema);
+
+ boolean isMaxEq = RngUtils.asBoolean(rng.next(descriptor, lts));
+
+ long[] minBound = schema.ckGenerator.slice(cd1);
+ long[] maxBound = schema.ckGenerator.slice(cd2);
+
+ int lock = RngUtils.asInt(descriptor, 0, schema.clusteringKeys.size() - 1);
+
+ // Logic here is similar to how clustering slices are implemented, except for both lower and upper bound
+ // get their values from sliced value in (1) and (2) cases:
+ //
+ // 1. Every part that is restricted with an EQ relation, takes its value from the min bound.
+ // TODO: this can actually be improved, since in case of hierarchical clustering generation we can
+ // pick out of the keys that are already locked. That said, we'll exercise more cases the way
+ // it is implemented right now.
+ // 2. Every part that is restricted with a non-EQ relation is taken into the bound, if it is used in
+ // the query. For example in, `ck1 = 0 AND ck2 > 2 AND ck2 < 5`, ck2 values 2 and 5 will be placed,
+ // correspondingly, to the min and max bound.
+ // 3. Every other part has to be restricted according to equality. Similar to clustering slice, we have
+ // to decide whether we use a min or the max value for the bound. Foe example `ck1 = 0 AND ck2 > 2 AND ck2 <= 5`,
+ // assuming we have ck3 that is present in schema but not mentioned in the query, we'll have bounds
+ // created as follows: [0, 2, max_value] and [0, 5, max_value]. Idea here is that since ck2 = 2 is excluded,
+ // we also disallow all ck3 values for [0, 2] prefix. Similarly, since ck2 = 5 is included, we allow every
+ // ck3 value with a prefix of [0, 5].
+ for (int i = 0; i < schema.clusteringKeys.size(); i++)
+ {
+ ColumnSpec<?> col = schema.clusteringKeys.get(i);
+ if (i < lock)
+ {
+ relations.add(Relation.eqRelation(col, minBound[i]));
+ maxBound[i] = minBound[i];
+ }
+ else if (i == lock)
+ {
+ long minLocked = Math.min(minBound[lock], maxBound[lock]);
+ long maxLocked = Math.max(minBound[lock], maxBound[lock]);
+
+ relations.add(Relation.relation(relationKind(true, isMinEq), col, minLocked));
+ minBound[i] = col.isReversed() ? maxLocked : minLocked;
+ relations.add(Relation.relation(relationKind(false, isMaxEq), col, maxLocked));
+ maxBound[i] = col.isReversed() ? minLocked : maxLocked;
+ }
+ else
+ {
+// if (i > 0 && schema.clusteringKeys.get(i - 1).isReversed())
+// {
+// minBound[i] = isMinEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i);
+// maxBound[i] = isMaxEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i);
+// }
+// else
+ {
+ minBound[i] = isMinEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i);
+ maxBound[i] = isMaxEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i);
+ }
+ }
+ }
+
+ long stitchedMin = schema.ckGenerator.stitch(minBound);
+ long stitchedMax = schema.ckGenerator.stitch(maxBound);
+
+// if (stitchedMin > stitchedMax)
+// {
+// long[] tmp = minBound;
+// minBound = maxBound;
+// maxBound = tmp;
+// stitchedMin = schema.ckGenerator.stitch(minBound);
+// stitchedMax = schema.ckGenerator.stitch(maxBound);
+// }
+//
+// for (int i = 0; i <= lock; i++)
+// {
+// ColumnSpec<?> col = schema.clusteringKeys.get(i);
+// if (i < lock)
+// {
+// relations.add(Relation.eqRelation(col, minBound[i]));
+// }
+// else
+// {
+// relations.add(Relation.relation(relationKind(true, isMinEq), col, minBound[lock]));
+// relations.add(Relation.relation(relationKind(false, isMaxEq), col, maxBound[lock]));
+// }
+// }
+
+ // if we're about to create an "impossible" query, just bump the modifier and re-generate
+ // TODO: so this isn't considered "normal" that we do it this way, but I'd rather fix it with
+ // a refactoring that's mentioned below
+ if (stitchedMin == stitchedMax)
+ {
+// if (modifier > 10)
+// {
+// logger.error(String.format("Unsuccessfully tried to generate query for %s%s;%s%s %s %d times. Schema: %s",
+// isMinEq ? "[" : "(", stitchedMin,
+// stitchedMax, isMaxEq ? "]" : ")",
+// queryKind, modifier, schema.compile().cql()),
+// new RuntimeException());
+// }
+ return inflate(lts, modifier + 1, queryKind);
+ }
+
+ // TODO: one of the ways to get rid of garbage here, and potentially even simplify the code is to
+ // simply return bounds here. After bounds are created, we slice them and generate query right
+ // from the bounds. In this case, we can even say that things like -inf/+inf are special values,
+ // and use them as placeholdrs. Also, it'll be easier to manipulate relations.
+ return new Query.ClusteringRangeQuery(Query.QueryKind.CLUSTERING_RANGE,
+ pd,
+ stitchedMin,
+ stitchedMax,
+ relationKind(true, isMinEq),
+ relationKind(false, isMaxEq),
+ reverse,
+ relations,
+ schema);
+ }
+ default:
+ throw new IllegalArgumentException("Shouldn't happen");
+ }
+ }
+
+ public static Relation.RelationKind relationKind(boolean isGt, boolean isEquals)
+ {
+ if (isGt)
+ return isEquals ? Relation.RelationKind.GTE : Relation.RelationKind.GT;
+ else
+ return isEquals ? Relation.RelationKind.LTE : Relation.RelationKind.LT;
+ }
+}
diff --git a/harry-core/src/harry/runner/RecentPartitionValidator.java b/harry-core/src/harry/runner/RecentPartitionValidator.java
new file mode 100644
index 0000000..82d4bda
--- /dev/null
+++ b/harry-core/src/harry/runner/RecentPartitionValidator.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import harry.core.MetricReporter;
+import harry.core.Run;
+import harry.generators.Surjections;
+import harry.model.Model;
+import harry.model.OpSelectors;
+
+public class RecentPartitionValidator implements PartitionVisitor
+{
+ private final Model model;
+
+ private final OpSelectors.MonotonicClock clock;
+ private final OpSelectors.PdSelector pdSelector;
+ private final QueryGenerator.TypedQueryGenerator querySelector;
+ private final MetricReporter metricReporter;
+ private final int partitionCount;
+ private final int triggerAfter;
+
+ public RecentPartitionValidator(int partitionCount,
+ int triggerAfter,
+ Run run,
+ Model.ModelFactory modelFactory)
+ {
+ this.partitionCount = partitionCount;
+ this.triggerAfter = triggerAfter;
+ this.metricReporter = run.metricReporter;
+ this.clock = run.clock;
+ this.pdSelector = run.pdSelector;
+
+ this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
+ // TODO: make query kind configurable
+ Surjections.enumValues(Query.QueryKind.class),
+ run.rangeSelector);
+ this.model = modelFactory.make(run);
+ }
+
+ // TODO: expose metric, how many times validated recent partitions
+ private void validateRecentPartitions(int partitionCount)
+ {
+ long maxLts = clock.maxLts();
+ long pos = pdSelector.positionFor(maxLts);
+
+ int maxPartitions = partitionCount;
+ while (pos > 0 && maxPartitions > 0 && !Thread.currentThread().isInterrupted())
+ {
+ long visitLts = pdSelector.minLtsAt(pos);
+
+ metricReporter.validateRandomQuery();
+ model.validate(querySelector.inflate(visitLts, 0));
+
+ pos--;
+ maxPartitions--;
+ }
+ }
+
+ public void visitPartition(long lts)
+ {
+ if (lts % triggerAfter == 0)
+ validateRecentPartitions(partitionCount);
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+ }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/RowVisitor.java b/harry-core/src/harry/runner/RowVisitor.java
index 93674f9..b24babb 100644
--- a/harry-core/src/harry/runner/RowVisitor.java
+++ b/harry-core/src/harry/runner/RowVisitor.java
@@ -18,7 +18,7 @@
package harry.runner;
-import harry.ddl.SchemaSpec;
+import harry.core.Run;
import harry.model.OpSelectors;
import harry.operations.CompiledStatement;
@@ -26,10 +26,7 @@ public interface RowVisitor
{
interface RowVisitorFactory
{
- RowVisitor make(SchemaSpec schema,
- OpSelectors.MonotonicClock clock,
- OpSelectors.DescriptorSelector descriptorSelector,
- QuerySelector querySelector);
+ RowVisitor make(Run run);
}
default CompiledStatement visitRow(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
@@ -46,6 +43,8 @@ public interface RowVisitor
return deleteColumn(lts, pd, cd, opId);
case DELETE_RANGE:
return deleteRange(lts, pd, opId);
+ case DELETE_SLICE:
+ return deleteSlice(lts, pd, opId);
default:
throw new IllegalStateException();
}
@@ -58,4 +57,8 @@ public interface RowVisitor
CompiledStatement deleteRow(long lts, long pd, long cd, long opId);
CompiledStatement deleteRange(long lts, long pd, long opId);
+
+ CompiledStatement deleteSlice(long lts, long pd, long opId);
+
+
}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index 7f1696e..a96dd69 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -23,11 +23,10 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -43,8 +42,6 @@ import harry.model.OpSelectors;
public abstract class Runner
{
- public static final Object[] EMPTY_BINDINGS = {};
-
private static final Logger logger = LoggerFactory.getLogger(Runner.class);
protected final Run run;
@@ -53,10 +50,10 @@ public abstract class Runner
// since we have multiple concurrent checkers running
protected final CopyOnWriteArrayList<Throwable> errors;
- public Runner(Run run)
+ public Runner(Run run, Configuration config)
{
this.run = run;
- this.config = run.snapshot;
+ this.config = config;
this.errors = new CopyOnWriteArrayList<>();
}
@@ -70,12 +67,11 @@ public abstract class Runner
if (config.create_schema)
{
// TODO: make RF configurable or make keyspace DDL configurable
- run.sut.execute("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ run.sut.schemaChange("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
- run.sut.execute(String.format("DROP TABLE IF EXISTS %s.%s;",
+ run.sut.schemaChange(String.format("DROP TABLE IF EXISTS %s.%s;",
run.schemaSpec.keyspace,
- run.schemaSpec.table),
- EMPTY_BINDINGS);
+ run.schemaSpec.table));
String schema = run.schemaSpec.compile().cql();
logger.info("Creating table: " + schema);
run.sut.schemaChange(schema);
@@ -83,10 +79,9 @@ public abstract class Runner
if (config.truncate_table)
{
- run.sut.execute(String.format("truncate %s.%s;",
- run.schemaSpec.keyspace,
- run.schemaSpec.table),
- EMPTY_BINDINGS);
+ run.sut.schemaChange(String.format("truncate %s.%s;",
+ run.schemaSpec.keyspace,
+ run.schemaSpec.table));
}
}
@@ -112,7 +107,7 @@ public abstract class Runner
protected void maybeReportErrors()
{
if (!errors.isEmpty())
- dumpStateToFile(run, errors);
+ dumpStateToFile(run, config, errors);
}
public abstract CompletableFuture initAndStartAll() throws InterruptedException;
@@ -140,20 +135,21 @@ public abstract class Runner
{
private final ScheduledExecutorService executor;
private final ScheduledExecutorService shutdownExceutor;
- private final int checkRecentAfter;
- private final int checkAllAfter;
+ private final List<PartitionVisitor> partitionVisitors;
+ private final Configuration config;
public SequentialRunner(Run run,
- int roundRobinValidatorThreads,
- int checkRecentAfter,
- int checkAllAfter)
+ Configuration config,
+ List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
{
- super(run);
+ super(run, config);
- this.executor = Executors.newScheduledThreadPool(roundRobinValidatorThreads + 1);
+ this.executor = Executors.newSingleThreadScheduledExecutor();
this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
- this.checkAllAfter = checkAllAfter;
- this.checkRecentAfter = checkRecentAfter;
+ this.config = config;
+ this.partitionVisitors = new ArrayList<>();
+ for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
+ partitionVisitors.add(factory.make(run));
}
public CompletableFuture initAndStartAll()
@@ -166,12 +162,12 @@ public abstract class Runner
logger.info("Completed");
// TODO: wait for the last full validation?
future.complete(null);
- }, run.snapshot.run_time, run.snapshot.run_time_unit);
+ }, config.run_time, config.run_time_unit);
executor.submit(reportThrowable(() -> {
try
{
- run(run.visitorFactory.get(), run.clock,
+ run(partitionVisitors, run.clock,
() -> Thread.currentThread().isInterrupted() || future.isDone());
}
catch (Throwable t)
@@ -184,19 +180,15 @@ public abstract class Runner
return future;
}
- void run(PartitionVisitor visitor,
+ void run(List<PartitionVisitor> visitors,
OpSelectors.MonotonicClock clock,
- BooleanSupplier exitCondition) throws ExecutionException, InterruptedException
+ BooleanSupplier exitCondition)
{
while (!exitCondition.getAsBoolean())
{
long lts = clock.nextLts();
- visitor.visitPartition(lts);
- if (lts % checkRecentAfter == 0)
- run.validator.validateRecentPartitions(100);
-
- if (lts % checkAllAfter == 0)
- run.validator.validateAllPartitions(executor, exitCondition, 10).get();
+ for (PartitionVisitor partitionVisitor : visitors)
+ partitionVisitor.visitPartition(lts);
}
}
@@ -215,105 +207,89 @@ public abstract class Runner
public static interface RunnerFactory
{
- public Runner make(Run run);
+ public Runner make(Run run, Configuration config);
}
// TODO: this requires some significant improvement
public static class ConcurrentRunner extends Runner
{
private final ScheduledExecutorService executor;
- private final ScheduledExecutorService shutdownExceutor;
+ private final ScheduledExecutorService shutdownExecutor;
+ private final List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories;
+ private final List<PartitionVisitor> allVisitors;
- private final int writerThreads;
- private final int roundRobinValidators;
- private final int recentPartitionValidators;
+ private final int concurrency;
private final long runTime;
private final TimeUnit runTimeUnit;
public ConcurrentRunner(Run run,
- int writerThreads,
- int roundRobinValidators,
- int recentPartitionValidators)
+ Configuration config,
+ int concurrency,
+ List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
{
- super(run);
- this.writerThreads = writerThreads;
- this.roundRobinValidators = roundRobinValidators;
- this.recentPartitionValidators = recentPartitionValidators;
- this.runTime = run.snapshot.run_time;
- this.runTimeUnit = run.snapshot.run_time_unit;
- this.executor = Executors.newScheduledThreadPool(this.writerThreads + this.roundRobinValidators + this.recentPartitionValidators);
- this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
+ super(run, config);
+ this.concurrency = concurrency;
+ this.runTime = config.run_time;
+ this.runTimeUnit = config.run_time_unit;
+ // TODO: configure concurrency
+ this.executor = Executors.newScheduledThreadPool(concurrency);
+ this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.partitionVisitorFactories = partitionVisitorFactories;
+ this.allVisitors = new CopyOnWriteArrayList<>();
}
- public CompletableFuture initAndStartAll() throws InterruptedException
+ public CompletableFuture initAndStartAll()
{
init();
CompletableFuture future = new CompletableFuture();
future.whenComplete((a, b) -> maybeReportErrors());
- shutdownExceutor.schedule(() -> {
+ shutdownExecutor.schedule(() -> {
logger.info("Completed");
// TODO: wait for the last full validation?
future.complete(null);
}, runTime, runTimeUnit);
BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
- for (int i = 0; i < writerThreads; i++)
+ for (int i = 0; i < concurrency; i++)
{
- executor.submit(reportThrowable(() -> run(run.visitorFactory.get(), run.clock, exitCondition),
+ List<PartitionVisitor> partitionVisitors = new ArrayList<>();
+ executor.submit(reportThrowable(() -> {
+
+ for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
+ {
+ partitionVisitors.add(factory.make(run));
+ }
+ allVisitors.addAll(partitionVisitors);
+ run(partitionVisitors, run.clock, exitCondition);
+ },
future));
- }
-
- scheduleValidateAllPartitions(run.validator, executor, future, roundRobinValidators);
- // N threads to validate recently written partitions
- for (int i = 0; i < recentPartitionValidators; i++)
- {
- executor.scheduleWithFixedDelay(reportThrowable(() -> {
- // TODO: make recent partitions configurable
- run.validator.validateRecentPartitions(100);
- }, future), 1000, 1, TimeUnit.MILLISECONDS);
}
return future;
}
- void run(PartitionVisitor visitor,
+ void run(List<PartitionVisitor> visitors,
OpSelectors.MonotonicClock clock,
BooleanSupplier exitCondition)
{
while (!exitCondition.getAsBoolean())
{
long lts = clock.nextLts();
- visitor.visitPartition(lts);
+ for (PartitionVisitor visitor : visitors)
+ visitor.visitPartition(lts);
}
}
- void scheduleValidateAllPartitions(Validator validator, ExecutorService executor, CompletableFuture future, int roundRobinValidators)
- {
- validator.validateAllPartitions(executor, () -> Thread.currentThread().isInterrupted() || future.isDone(),
- roundRobinValidators)
- .handle((v, err) -> {
- if (err != null)
- {
- errors.add(err);
- if (!future.isDone())
- future.completeExceptionally(err);
- }
-
- if (Thread.currentThread().isInterrupted() || future.isDone())
- return null;
-
- scheduleValidateAllPartitions(validator, executor, future, roundRobinValidators);
- return null;
- });
- }
-
public void shutdown() throws InterruptedException
{
logger.info("Shutting down...");
- shutdownExceutor.shutdownNow();
- shutdownExceutor.awaitTermination(1, TimeUnit.MINUTES);
+ for (PartitionVisitor visitor : allVisitors)
+ visitor.shutdown();
+
+ shutdownExecutor.shutdownNow();
+ shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.MINUTES);
@@ -346,7 +322,7 @@ public abstract class Runner
bw.newLine();
}
- private static void dumpStateToFile(Run run, List<Throwable> t)
+ private static void dumpStateToFile(Run run, Configuration config, List<Throwable> t)
{
try
{
@@ -361,14 +337,14 @@ public abstract class Runner
bw.flush();
}
- File config = new File("run.yaml");
- Configuration.ConfigurationBuilder builder = run.snapshot.unbuild();
+ File file = new File("run.yaml");
+ Configuration.ConfigurationBuilder builder = config.unbuild();
// overrride stateful components
builder.setClock(run.clock.toConfig());
- builder.setModel(run.model.toConfig());
+ builder.setDataTracker(run.tracker.toConfig());
- try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(config))))
+ try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))))
{
bw.write(Configuration.toYamlString(builder.build()));
bw.flush();
diff --git a/harry-core/src/harry/runner/SinglePartitionValidator.java b/harry-core/src/harry/runner/SinglePartitionValidator.java
new file mode 100644
index 0000000..e9d4f27
--- /dev/null
+++ b/harry-core/src/harry/runner/SinglePartitionValidator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import harry.core.Run;
+import harry.model.Model;
+
+public class SinglePartitionValidator implements PartitionVisitor
+{
+ protected final int iterations;
+ protected final Model model;
+ protected final QueryGenerator queryGenerator;
+
+ public SinglePartitionValidator(int iterations,
+ Run run,
+ Model.ModelFactory modelFactory)
+ {
+ this.iterations = iterations;
+ this.model = modelFactory.make(run);
+ this.queryGenerator = new QueryGenerator(run);
+ }
+
+ public void shutdown() throws InterruptedException
+ {
+
+ }
+
+ public void visitPartition(long lts)
+ {
+ model.validate(queryGenerator.inflate(lts, 0, Query.QueryKind.SINGLE_PARTITION));
+
+ for (Query.QueryKind queryKind : new Query.QueryKind[]{ Query.QueryKind.CLUSTERING_RANGE, Query.QueryKind.CLUSTERING_SLICE, Query.QueryKind.SINGLE_CLUSTERING })
+ {
+ for (int i = 0; i < iterations; i++)
+ {
+ model.validate(queryGenerator.inflate(lts, i, queryKind));
+ }
+ }
+ }
+}
diff --git a/harry-core/src/harry/runner/Validator.java b/harry-core/src/harry/runner/Validator.java
deleted file mode 100644
index 4bd1d90..0000000
--- a/harry-core/src/harry/runner/Validator.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package harry.runner;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BooleanSupplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.ddl.SchemaSpec;
-import harry.generators.Surjections;
-import harry.model.Model;
-import harry.model.OpSelectors;
-
-// This might be something that potentially grows into the validator described in the design doc;
-// right now it's just a helper/container class
-public class Validator
-{
- private static final Logger logger = LoggerFactory.getLogger(Validator.class);
-
- private final Model model;
- private final SchemaSpec schema;
-
- private final OpSelectors.MonotonicClock clock;
- private final OpSelectors.PdSelector pdSelector;
- private final QuerySelector querySelector;
-
- public Validator(Model model,
- SchemaSpec schema,
- OpSelectors.MonotonicClock clock,
- OpSelectors.PdSelector pdSelector,
- OpSelectors.DescriptorSelector descriptorSelector,
- OpSelectors.Rng rng)
- {
- this.model = model;
- this.schema = schema;
- this.clock = clock;
- this.pdSelector = pdSelector;
- this.querySelector = new QuerySelector(schema,
- pdSelector,
- descriptorSelector,
- Surjections.enumValues(Query.QueryKind.class),
- rng);
- }
-
- // TODO: expose metric, how many times validated recent partitions
- public void validateRecentPartitions(int partitionCount)
- {
- long maxLts = clock.maxLts();
- long pos = pdSelector.positionFor(maxLts);
-
- int maxPartitions = partitionCount;
- while (pos > 0 && maxPartitions > 0 && !Thread.currentThread().isInterrupted())
- {
- long visitLts = pdSelector.minLtsAt(pos);
-
- model.validatePartitionState(visitLts,
- querySelector.inflate(visitLts, 0));
-
- pos--;
- maxPartitions--;
- }
- }
-
- public CompletableFuture<Void> validateAllPartitions(ExecutorService executor, BooleanSupplier keepRunning, int parallelism)
- {
- long maxLts = clock.maxLts() - 1;
- long maxPos = pdSelector.positionFor(maxLts);
- AtomicLong counter = new AtomicLong();
- CompletableFuture[] futures = new CompletableFuture[parallelism];
- for (int i = 0; i < parallelism; i++)
- {
- futures[i] = CompletableFuture.supplyAsync(() -> {
- long pos;
- while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && keepRunning.getAsBoolean())
- {
- if (pos > 0 && pos % 1000 == 0)
- logger.debug(String.format("Validated %d out of %d partitions", pos, maxPos));
- long visitLts = pdSelector.minLtsAt(pos);
- for (boolean reverse : new boolean[]{ true, false })
- {
- model.validatePartitionState(visitLts,
- Query.selectPartition(schema, pdSelector.pd(visitLts, schema), reverse));
- }
- }
- return null;
- }, executor);
- }
- return CompletableFuture.allOf(futures);
- }
-
- public void validatePartition(long lts)
- {
- for (boolean reverse : new boolean[]{ true, false })
- {
- model.validatePartitionState(lts,
- Query.selectPartition(schema, pdSelector.pd(lts, schema), reverse));
- }
- }
-}
diff --git a/harry-core/src/harry/util/Ranges.java b/harry-core/src/harry/util/Ranges.java
index a49b2bd..9da2af8 100644
--- a/harry-core/src/harry/util/Ranges.java
+++ b/harry-core/src/harry/util/Ranges.java
@@ -84,7 +84,8 @@ public class Ranges
public Range(long minBound, long maxBound, boolean minInclusive, boolean maxInclusive, long timestamp)
{
- assert (minBound < maxBound) || ((minBound == maxBound) && minInclusive && maxInclusive) :
+ // (minBound < maxBound) ||
+ assert ((minBound != maxBound) || (minInclusive && maxInclusive)) :
String.format("Min bound should be less than max bound, or both bounds have to be inclusive, but was: %s%d,%d%s",
minInclusive ? "[" : "(",
minBound, maxBound,
@@ -99,12 +100,24 @@ public class Ranges
public boolean contains(long descriptor)
{
- if (minInclusive && descriptor == minBound)
- return true;
- if (maxInclusive && descriptor == maxBound)
- return true;
+ if (minInclusive && maxInclusive)
+ return descriptor >= minBound && descriptor <= maxBound;
- return (descriptor > minBound) && (descriptor < maxBound);
+ if (!minInclusive && !maxInclusive)
+ return descriptor > minBound && descriptor < maxBound;
+
+ if (!minInclusive && maxInclusive)
+ return descriptor > minBound && descriptor <= maxBound;
+
+ assert (minInclusive && !maxInclusive);
+ return descriptor >= minBound && descriptor < maxBound;
+
+// if ((minInclusive && descriptor == minBound) && descriptor < maxBound)
+// return true;
+// if ((maxInclusive && descriptor == maxBound) && descriptor > minBound)
+// return true;
+//
+// return (descriptor > minBound) && (descriptor < maxBound);
}
public boolean contains(long descriptor, long ts)
diff --git a/harry-core/test/harry/generators/RandomGeneratorTest.java b/harry-core/test/harry/generators/RandomGeneratorTest.java
index c84fd9b..b34553d 100644
--- a/harry-core/test/harry/generators/RandomGeneratorTest.java
+++ b/harry-core/test/harry/generators/RandomGeneratorTest.java
@@ -31,13 +31,14 @@ import static junit.framework.TestCase.fail;
public class RandomGeneratorTest
{
+ private static int RUNS = 100000;
+
@Test
public void testShuffleUnshuffle()
{
- int iterations = 100000;
Random rnd = new Random();
- for (int i = 1; i < iterations; i++)
+ for (int i = 1; i < RUNS; i++)
{
long l = rnd.nextLong();
Assert.assertEquals(l, PCGFastPure.unshuffle(PCGFastPure.shuffle(l)));
@@ -48,23 +49,38 @@ public class RandomGeneratorTest
public void testImmutableRng()
{
int size = 5;
- for (int stream = 1; stream < 1000000; stream++)
+ OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
+ for (int stream = 1; stream < RUNS; stream++)
{
long[] generated = new long[size];
- OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
for (int i = 0; i < size; i++)
generated[i] = rng.randomNumber(i, stream);
+ Assert.assertEquals(0, rng.sequenceNumber(generated[0], stream));
+ Assert.assertEquals(generated[1], rng.next(generated[0], stream));
+
for (int i = 1; i < size; i++)
{
Assert.assertEquals(generated[i], rng.next(generated[i - 1], stream));
Assert.assertEquals(generated[i - 1], rng.prev(generated[i], stream));
- Assert.assertEquals(i - 1, rng.sequenceNumber(generated[i], stream));
+ Assert.assertEquals(i, rng.sequenceNumber(generated[i], stream));
}
}
}
@Test
+ public void testSequenceNumber()
+ {
+ int size = 5;
+ OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
+ for (int stream = 1; stream < RUNS; stream++)
+ {
+ for (int i = 0; i < size; i++)
+ Assert.assertEquals(i, rng.sequenceNumber(rng.randomNumber(i, stream), stream));
+ }
+ }
+
+ @Test
public void seekTest()
{
// TODO: more examples; randomize
@@ -81,14 +97,14 @@ public class RandomGeneratorTest
Assert.assertEquals(last, rand.next());
Assert.assertEquals(first, rand.nextAt(0));
Assert.assertEquals(last, rand.nextAt(10));
- Assert.assertEquals(-11, rand.distance(first));
+ Assert.assertEquals(-10, rand.distance(first));
}
@Test
public void shuffleUnshuffleTest()
{
Random rnd = new Random();
- for (int i = 0; i < 100000; i++)
+ for (int i = 0; i < RUNS; i++)
{
long a = rnd.nextLong();
Assert.assertEquals(a, PCGFastPure.unshuffle(PCGFastPure.shuffle(a)));
@@ -103,7 +119,7 @@ public class RandomGeneratorTest
int a = 0;
int b = 50;
int[] cardinality = new int[b - a];
- for (int i = 0; i < 100000; i++)
+ for (int i = 0; i < RUNS; i++)
{
int min = Math.min(a, b);
int max = Math.max(a, b);
diff --git a/harry-core/test/harry/generators/SurjectionsTest.java b/harry-core/test/harry/generators/SurjectionsTest.java
index b927041..5cc64c7 100644
--- a/harry-core/test/harry/generators/SurjectionsTest.java
+++ b/harry-core/test/harry/generators/SurjectionsTest.java
@@ -31,6 +31,8 @@ import harry.generators.Surjections;
public class SurjectionsTest
{
+ private static int RUNS = 1000000;
+
@Test
public void weightedTest()
{
@@ -42,7 +44,7 @@ public class SurjectionsTest
Map<String, Integer> frequencies = new HashMap<>();
RandomGenerator rng = new PcgRSUFast(System.currentTimeMillis(), 0);
- for (int i = 0; i < 1000000; i++)
+ for (int i = 0; i < RUNS; i++)
{
String s = gen.inflate(rng.next());
frequencies.compute(s, (s1, i1) -> {
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 6c55a7f..d90fa5f 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -32,16 +32,18 @@ import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
+import harry.core.MetricReporter;
+import harry.core.Run;
import harry.ddl.ColumnSpec;
import harry.ddl.SchemaGenerators;
import harry.ddl.SchemaSpec;
-import harry.generators.PcgRSUFast;
-import harry.generators.RandomGenerator;
import harry.generators.Surjections;
import harry.generators.distribution.Distribution;
-import harry.model.sut.NoOpSut;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.SystemUnderTest;
import harry.operations.CompiledStatement;
-import harry.runner.DefaultPartitionVisitorFactory;
+import harry.runner.DataTracker;
+import harry.runner.MutatingPartitionVisitor;
import harry.runner.PartitionVisitor;
import harry.runner.RowVisitor;
import harry.util.BitSet;
@@ -50,6 +52,8 @@ import static harry.model.OpSelectors.DefaultDescriptorSelector.DEFAULT_OP_TYPE_
public class OpSelectorsTest
{
+ private static int RUNS = 10000;
+
@Test
public void testRowDataDescriptorSupplier()
{
@@ -69,7 +73,7 @@ public class OpSelectorsTest
100,
100);
- for (int lts = 0; lts < 100000; lts++)
+ for (int lts = 0; lts < RUNS; lts++)
{
long pd = pdSupplier.pd(lts);
for (int m = 0; m < descriptorSelector.numberOfModifications(lts); m++)
@@ -94,9 +98,9 @@ public class OpSelectorsTest
public void pdSelectorTest()
{
OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
- int cycles = 1000;
+ int cycles = 10000;
- for (int repeats = 2; repeats <= 100; repeats++)
+ for (int repeats = 2; repeats <= 1000; repeats++)
{
for (int windowSize = 2; windowSize <= 10; windowSize++)
{
@@ -104,7 +108,9 @@ public class OpSelectorsTest
long[] pds = new long[cycles];
for (int i = 0; i < cycles; i++)
{
- pds[i] = pdSupplier.pd(i);
+ long pd = pdSupplier.pd(i);
+ pds[i] = pd;
+ Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
}
Set<Long> noNext = new HashSet<>();
@@ -150,7 +156,8 @@ public class OpSelectorsTest
for (int i = 0; i < cycles; i++)
{
- long maxLts = pdSupplier.maxLts(i);
+ long pd = pdSupplier.pd(i);
+ long maxLts = pdSupplier.maxLtsFor(pd);
Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
}
@@ -161,8 +168,8 @@ public class OpSelectorsTest
@Test
public void ckSelectorTest()
{
- Supplier<SchemaSpec> gen = SchemaGenerators.progression(5);
- for (int i = 0; i < 30; i++)
+ Supplier<SchemaSpec> gen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+ for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
ckSelectorTest(gen.get());
}
@@ -191,37 +198,48 @@ public class OpSelectorsTest
});
};
- PartitionVisitor partitionVisitor = new DefaultPartitionVisitorFactory(new DoNothingModel(),
- new NoOpSut(),
- pdSelector,
- ckSelector,
- schema,
- new RowVisitor()
- {
- public CompiledStatement write(long lts, long pd, long cd, long m)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
+ Run run = new Run(rng,
+ new OffsetClock(0),
+ pdSelector,
+ ckSelector,
+ schema,
+ DataTracker.NO_OP,
+ SystemUnderTest.NO_OP,
+ MetricReporter.NO_OP);
+
+ PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run,
+ (r) -> new RowVisitor()
+ {
+ public CompiledStatement write(long lts, long pd, long cd, long m)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
- public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
+ public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
- public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
+ public CompiledStatement deleteRange(long lts, long pd, long opId)
+ {
+ // ignore
+ return compiledStatement;
+ }
- public CompiledStatement deleteRange(long lts, long pd, long opId)
- {
- // ignore
- return compiledStatement;
- }
- }).get();
+ public CompiledStatement deleteSlice(long lts, long pd, long opId)
+ {
+ // ignore
+ return compiledStatement;
+ }
+ });
for (int lts = 0; lts < 1000; lts++)
{
diff --git a/harry-core/test/harry/operations/RelationTest.java b/harry-core/test/harry/operations/RelationTest.java
index 15ab02e..70ccd69 100644
--- a/harry-core/test/harry/operations/RelationTest.java
+++ b/harry-core/test/harry/operations/RelationTest.java
@@ -32,7 +32,7 @@ import harry.ddl.SchemaSpec;
import harry.generators.DataGeneratorsTest;
import harry.model.OpSelectors;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
import harry.util.BitSet;
public class RelationTest
@@ -42,9 +42,9 @@ public class RelationTest
@Test
public void testKeyGenerators()
{
- for (int cnt = 1; cnt < 5; cnt++)
+ for (int size = 1; size < 5; size++)
{
- Iterator<ColumnSpec.DataType[]> iter = DataGeneratorsTest.permutations(cnt,
+ Iterator<ColumnSpec.DataType[]> iter = DataGeneratorsTest.permutations(size,
ColumnSpec.DataType.class,
ColumnSpec.int8Type,
ColumnSpec.asciiType,
@@ -62,7 +62,7 @@ public class RelationTest
spec.add(ColumnSpec.ck("r" + i, types[i], false));
SchemaSpec schemaSpec = new SchemaSpec("ks",
- "tbl",
+ "tbl",
Collections.singletonList(ColumnSpec.pk("pk", ColumnSpec.int64Type)),
spec,
Collections.emptyList(),
@@ -86,116 +86,123 @@ public class RelationTest
Arrays.sort(cds);
OpSelectors.Rng rng = new OpSelectors.PCGFast(1L);
- // TODO: replace with mocks?
- QuerySelector querySelector = new QuerySelector(schemaSpec,
- new OpSelectors.PdSelector()
- {
- protected long pd(long lts)
- {
- return lts;
- }
-
- public long nextLts(long lts)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long prevLts(long lts)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long maxLts(long lts)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long minLtsAt(long position)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long minLtsFor(long pd)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long positionFor(long lts)
- {
- throw new RuntimeException("not implemented");
- }
- },
- new OpSelectors.DescriptorSelector()
- {
- public int numberOfModifications(long lts)
- {
- throw new RuntimeException("not implemented");
- }
-
- public int opsPerModification(long lts)
- {
- throw new RuntimeException("not implemented");
- }
-
- public int maxPartitionSize()
- {
- throw new RuntimeException("not implemented");
- }
-
- public boolean isCdVisitedBy(long pd, long lts, long cd)
- {
- throw new RuntimeException("not implemented");
- }
-
- protected long cd(long pd, long lts, long opId)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long randomCd(long pd, long entropy)
- {
- return Math.abs(rng.prev(entropy)) % cds.length;
- }
-
- protected long vd(long pd, long cd, long lts, long opId, int col)
- {
- throw new RuntimeException("not implemented");
- }
-
- public OpSelectors.OperationKind operationType(long pd, long lts, long opId)
- {
- throw new RuntimeException("not implemented");
- }
-
- public BitSet columnMask(long pd, long lts, long opId)
- {
- throw new RuntimeException("not implemented");
- }
-
- public long rowId(long pd, long lts, long cd)
- {
- return 0;
- }
-
- public long modificationId(long pd, long cd, long lts, long vd, int col)
- {
- return 0;
- }
- },
- rng);
-
+ // TODO: replace with mocks?
+ QueryGenerator querySelector = new QueryGenerator(schemaSpec,
+ new OpSelectors.PdSelector()
+ {
+ protected long pd(long lts)
+ {
+ return lts;
+ }
+
+ public long nextLts(long lts)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long prevLts(long lts)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long maxLtsFor(long pd)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long maxLts(long lts)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long minLtsAt(long position)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+
+ public long minLtsFor(long pd)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long positionFor(long lts)
+ {
+ throw new RuntimeException("not implemented");
+ }
+ },
+ new OpSelectors.DescriptorSelector()
+ {
+ public int numberOfModifications(long lts)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public int opsPerModification(long lts)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public int maxPartitionSize()
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public boolean isCdVisitedBy(long pd, long lts, long cd)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ protected long cd(long pd, long lts, long opId)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long randomCd(long pd, long entropy)
+ {
+ return Math.abs(rng.prev(entropy)) % cds.length;
+ }
+
+ protected long vd(long pd, long cd, long lts, long opId, int col)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public OpSelectors.OperationKind operationType(long pd, long lts, long opId)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public BitSet columnMask(long pd, long lts, long opId)
+ {
+ throw new RuntimeException("not implemented");
+ }
+
+ public long rowId(long pd, long lts, long cd)
+ {
+ return 0;
+ }
+
+ public long modificationId(long pd, long cd, long lts, long vd, int col)
+ {
+ return 0;
+ }
+ },
+ rng);
+
+ QueryGenerator.TypedQueryGenerator gen = new QueryGenerator.TypedQueryGenerator(rng, querySelector);
try
{
for (int i = 0; i < RUNS; i++)
{
- Query query = querySelector.inflate(i, 0);
+ Query query = gen.inflate(i, 0);
for (int j = 0; j < cds.length; j++)
{
long cd = schemaSpec.ckGenerator.adjustEntropyDomain(cds[i]);
// the only thing we care about here is that query
- Assert.assertEquals(String.format("Error caught quen running a query %s with cd %d",
+ Assert.assertEquals(String.format("Error caught while running a query %s with cd %d",
query, cd),
Query.simpleMatch(query, cd),
query.match(cd));
diff --git a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java b/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
index 2cf22e7..e471ede 100644
--- a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
+++ b/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
@@ -58,7 +58,7 @@ public class ExternalClusterSut implements SystemUnderTest
{
// TODO: close Cluster and Session!
return new ExternalClusterSut(Cluster.builder()
- .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM))
+ .withQueryOptions(new QueryOptions().setConsistencyLevel(toDriverCl(ConsistencyLevel.QUORUM)))
.addContactPoints(config.contactPoints)
.withPort(config.port)
.withCredentials(config.username, config.password)
@@ -86,14 +86,16 @@ public class ExternalClusterSut implements SystemUnderTest
}
// TODO: this is rather simplistic
- public Object[][] execute(String statement, Object... bindings)
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
{
int repeat = 10;
while (true)
{
try
{
- return resultSetToObjectArray(session.execute(statement, bindings));
+ Statement st = new SimpleStatement(statement, bindings);
+ st.setConsistencyLevel(toDriverCl(cl));
+ return resultSetToObjectArray(session.execute(st));
}
catch (Throwable t)
{
@@ -129,10 +131,12 @@ public class ExternalClusterSut implements SystemUnderTest
return results;
}
- public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
{
CompletableFuture<Object[][]> future = new CompletableFuture<>();
- Futures.addCallback(session.executeAsync(statement, bindings),
+ Statement st = new SimpleStatement(statement, bindings);
+ st.setConsistencyLevel(toDriverCl(cl));
+ Futures.addCallback(session.executeAsync(st),
new FutureCallback<ResultSet>()
{
public void onSuccess(ResultSet rows)
@@ -176,4 +180,14 @@ public class ExternalClusterSut implements SystemUnderTest
return ExternalClusterSut.create(this);
}
}
+
+ public static com.datastax.driver.core.ConsistencyLevel toDriverCl(SystemUnderTest.ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ALL: return com.datastax.driver.core.ConsistencyLevel.ALL;
+ case QUORUM: return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+ }
+ throw new IllegalArgumentException("Don't know a CL: " + cl);
+ }
}
\ No newline at end of file
diff --git a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
index 5462568..702163e 100644
--- a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
+++ b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
@@ -21,10 +21,11 @@ package harry.runner.external;
import harry.core.Configuration;
import harry.model.sut.external.ExternalClusterSut;
import harry.runner.HarryRunner;
+import harry.runner.Runner;
import java.io.File;
-public class HarryRunnerExternal implements HarryRunner {
+public class HarryRunnerExternal extends HarryRunner {
public static void main(String[] args) throws Throwable {
ExternalClusterSut.registerSubtypes();
@@ -35,4 +36,9 @@ public class HarryRunnerExternal implements HarryRunner {
Configuration configuration = Configuration.fromFile(configFile);
runner.run(configuration);
}
+
+ @Override
+ public void beforeRun(Runner runner) {
+
+ }
}
diff --git a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java b/harry-integration/src/harry/model/sut/ExternalClusterSut.java
similarity index 66%
copy from harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
copy to harry-integration/src/harry/model/sut/ExternalClusterSut.java
index 2cf22e7..aefe508 100644
--- a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
+++ b/harry-integration/src/harry/model/sut/ExternalClusterSut.java
@@ -16,16 +16,7 @@
* limitations under the License.
*/
-package harry.model.sut.external;
-
-import com.datastax.driver.core.*;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import harry.core.Configuration;
-import harry.model.sut.SystemUnderTest;
+package harry.model.sut;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -33,13 +24,20 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
public class ExternalClusterSut implements SystemUnderTest
{
- public static void registerSubtypes()
- {
- Configuration.registerSubtypes(ExternalSutConfiguration.class);
- }
-
private final Session session;
private final ExecutorService executor;
@@ -54,14 +52,12 @@ public class ExternalClusterSut implements SystemUnderTest
this.executor = Executors.newFixedThreadPool(threads);
}
- public static ExternalClusterSut create(ExternalSutConfiguration config)
+ public static ExternalClusterSut create()
{
// TODO: close Cluster and Session!
return new ExternalClusterSut(Cluster.builder()
- .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM))
- .addContactPoints(config.contactPoints)
- .withPort(config.port)
- .withCredentials(config.username, config.password)
+ .withQueryOptions(new QueryOptions().setConsistencyLevel(toDriverCl(ConsistencyLevel.QUORUM)))
+ .addContactPoints("127.0.0.1")
.build()
.connect());
}
@@ -86,14 +82,16 @@ public class ExternalClusterSut implements SystemUnderTest
}
// TODO: this is rather simplistic
- public Object[][] execute(String statement, Object... bindings)
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
{
int repeat = 10;
while (true)
{
try
{
- return resultSetToObjectArray(session.execute(statement, bindings));
+ Statement st = new SimpleStatement(statement, bindings);
+ st.setConsistencyLevel(toDriverCl(cl));
+ return resultSetToObjectArray(session.execute(st));
}
catch (Throwable t)
{
@@ -107,8 +105,7 @@ public class ExternalClusterSut implements SystemUnderTest
}
}
-
- private static Object[][] resultSetToObjectArray(ResultSet rs)
+ public static Object[][] resultSetToObjectArray(ResultSet rs)
{
List<Row> rows = rs.all();
if (rows.size() == 0)
@@ -129,10 +126,12 @@ public class ExternalClusterSut implements SystemUnderTest
return results;
}
- public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
{
CompletableFuture<Object[][]> future = new CompletableFuture<>();
- Futures.addCallback(session.executeAsync(statement, bindings),
+ Statement st = new SimpleStatement(statement, bindings);
+ st.setConsistencyLevel(toDriverCl(cl));
+ Futures.addCallback(session.executeAsync(st),
new FutureCallback<ResultSet>()
{
public void onSuccess(ResultSet rows)
@@ -150,30 +149,13 @@ public class ExternalClusterSut implements SystemUnderTest
return future;
}
- @JsonTypeName("external")
- public static class ExternalSutConfiguration implements Configuration.SutConfiguration
+ public static com.datastax.driver.core.ConsistencyLevel toDriverCl(SystemUnderTest.ConsistencyLevel cl)
{
-
- private final String contactPoints;
- private final int port;
- private final String username;
- private final String password;
-
- @JsonCreator
- public ExternalSutConfiguration(@JsonProperty(value = "contact_points") String contactPoints,
- @JsonProperty(value = "port") int port,
- @JsonProperty(value = "username") String username,
- @JsonProperty(value = "password") String password)
- {
- this.contactPoints = contactPoints;
- this.port = port;
- this.username = username;
- this.password = password;
- }
-
- public SystemUnderTest make()
+ switch (cl)
{
- return ExternalClusterSut.create(this);
+ case ALL: return com.datastax.driver.core.ConsistencyLevel.ALL;
+ case QUORUM: return com.datastax.driver.core.ConsistencyLevel.QUORUM;
}
+ throw new IllegalArgumentException("Don't know a CL: " + cl);
}
}
\ No newline at end of file
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index d6b21a2..1786f60 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -35,11 +35,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import harry.core.Configuration;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
public class InJvmSut implements SystemUnderTest
{
- public static void registerSubtypes()
+ public static void init()
{
Configuration.registerSubtypes(InJvmSutConfiguration.class);
}
@@ -51,6 +55,7 @@ public class InJvmSut implements SystemUnderTest
public final Cluster cluster;
private final AtomicLong cnt = new AtomicLong();
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
public InJvmSut(Cluster cluster)
{
this(cluster, 10);
@@ -62,6 +67,11 @@ public class InJvmSut implements SystemUnderTest
this.executor = Executors.newFixedThreadPool(threads);
}
+ public Cluster cluster()
+ {
+ return cluster;
+ }
+
public boolean isShutdown()
{
return isShutdown.get();
@@ -73,6 +83,7 @@ public class InJvmSut implements SystemUnderTest
cluster.close();
executor.shutdown();
+
try
{
executor.awaitTermination(30, TimeUnit.SECONDS);
@@ -88,17 +99,67 @@ public class InJvmSut implements SystemUnderTest
cluster.schemaChange(statement);
}
- public Object[][] execute(String statement, Object... bindings)
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return execute(statement, cl, (int) (cnt.getAndIncrement() % cluster.size() + 1), bindings);
+ }
+
+ public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, Object... bindings)
+ {
+ if (isShutdown.get())
+ throw new RuntimeException("Instance is shut down");
+
+ try
+ {
+ if (cl == ConsistencyLevel.NODE_LOCAL)
+ {
+ return cluster.get(coordinator)
+ .executeInternal(statement, bindings);
+ }
+ else
+ {
+ return cluster
+ // round-robin
+ .coordinator(coordinator)
+ .execute(statement, toApiCl(cl), bindings);
+ }
+ }
+ catch (Throwable t)
+ {
+ logger.error(String.format("Caught error while trying execute statement %s: %s", statement, t.getMessage()),
+ t);
+ throw t;
+ }
+ }
+
+ // TODO: Ideally, we need to be able to induce a failure of a single specific message
+ public Object[][] executeWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
{
if (isShutdown.get())
throw new RuntimeException("Instance is shut down");
try
{
- return cluster
- // round-robin
- .coordinator((int) (cnt.getAndIncrement() % cluster.size() + 1))
- .execute(statement, ConsistencyLevel.QUORUM, bindings);
+ int coordinator = (int) (cnt.getAndIncrement() % cluster.size() + 1);
+ IMessageFilters filters = cluster.filters();
+
+ // Drop exactly one coordinated message
+ filters.verbs(Verb.MUTATION_REQ.id).from(coordinator).messagesMatching(new IMessageFilters.Matcher()
+ {
+ private final AtomicBoolean issued = new AtomicBoolean();
+ public boolean matches(int from, int to, IMessage message)
+ {
+ if (from != coordinator || message.verb() != Verb.MUTATION_REQ.id)
+ return false;
+
+ return !issued.getAndSet(true);
+ }
+ }).drop().on();
+ Object[][] res = cluster
+ .coordinator(coordinator)
+ .execute(statement, toApiCl(cl), bindings);
+ filters.reset();
+ return res;
}
catch (Throwable t)
{
@@ -108,9 +169,14 @@ public class InJvmSut implements SystemUnderTest
}
}
- public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
{
- return CompletableFuture.supplyAsync(() -> execute(statement, bindings), executor);
+ return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings), executor);
+ }
+
+ public CompletableFuture<Object[][]> executeAsyncWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+ {
+ return CompletableFuture.supplyAsync(() -> executeWithWriteFailure(statement, cl, bindings), executor);
}
@JsonTypeName("in_jvm")
@@ -132,17 +198,26 @@ public class InJvmSut implements SystemUnderTest
public SystemUnderTest make()
{
+ try
+ {
+ ICluster.setup();
+ }
+ catch (Throwable throwable)
+ {
+ throwable.printStackTrace();
+ }
+
Cluster cluster;
try
{
cluster = Cluster.build().withConfig((cfg) -> {
// TODO: make this configurable
- cfg.set("row_cache_size_in_mb", 10L)
+ cfg.with(Feature.NETWORK, Feature.GOSSIP)
+ .set("row_cache_size_in_mb", 10L)
.set("index_summary_capacity_in_mb", 10L)
.set("counter_cache_size_in_mb", 10L)
.set("key_cache_size_in_mb", 10L)
.set("file_cache_size_in_mb", 10)
- .set("prepared_statements_cache_size_mb", 10L)
.set("memtable_heap_space_in_mb", 128)
.set("memtable_offheap_space_in_mb", 128)
.set("memtable_flush_writers", 1)
@@ -151,7 +226,8 @@ public class InJvmSut implements SystemUnderTest
.set("concurrent_writes", 5)
.set("compaction_throughput_mb_per_sec", 10)
.set("hinted_handoff_enabled", false);
- }).withNodes(nodes)
+ })
+ .withNodes(nodes)
.withRoot(new File(root)).createWithoutStarting();
}
catch (IOException e)
@@ -163,4 +239,15 @@ public class InJvmSut implements SystemUnderTest
return new InJvmSut(cluster);
}
}
+
+ public static org.apache.cassandra.distributed.api.ConsistencyLevel toApiCl(ConsistencyLevel cl)
+ {
+ switch (cl)
+ {
+ case ALL: return org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+ case QUORUM: return org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+ case NODE_LOCAL: return org.apache.cassandra.distributed.api.ConsistencyLevel.NODE_LOCAL;
+ }
+ throw new IllegalArgumentException("Don't know a CL: " + cl);
+ }
}
\ No newline at end of file
diff --git a/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java b/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java
new file mode 100644
index 0000000..84e1a73
--- /dev/null
+++ b/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class FaultInjectingPartitionVisitor extends LoggingPartitionVisitor
+{
+ public static void init()
+ {
+ Configuration.registerSubtypes(FaultInjectingPartitionVisitorConfiguration.class);
+ }
+
+ @JsonTypeName("fault_injecting")
+ public static class FaultInjectingPartitionVisitorConfiguration extends Configuration.MutatingPartitionVisitorConfiguation
+ {
+ @JsonCreator
+ public FaultInjectingPartitionVisitorConfiguration(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration row_visitor)
+ {
+ super(row_visitor);
+ }
+
+ @Override
+ public PartitionVisitor make(Run run)
+ {
+ return new FaultInjectingPartitionVisitor(run, row_visitor);
+ }
+ }
+
+ private final AtomicInteger cnt = new AtomicInteger();
+
+ private final InJvmSut sut;
+
+ public FaultInjectingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+ {
+ super(run, rowVisitorFactory);
+ this.sut = (InJvmSut) run.sut;
+ }
+
+ void executeAsyncWithRetries(CompletableFuture<Object[][]> originator, CompiledStatement statement)
+ {
+ executeAsyncWithRetries(originator, statement, true);
+ }
+
+ void executeAsyncWithRetries(CompletableFuture<Object[][]> originator, CompiledStatement statement, boolean allowFailures)
+ {
+ if (sut.isShutdown())
+ throw new IllegalStateException("System under test is shut down");
+
+ CompletableFuture<Object[][]> future;
+ if (allowFailures && cnt.getAndIncrement() % 2 == 0)
+ {
+ future = sut.executeAsyncWithWriteFailure(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings());
+ }
+ else
+ {
+ future = sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings());
+ }
+
+ future.whenComplete((res, t) -> {
+ if (t != null)
+ executor.schedule(() -> executeAsyncWithRetries(originator, statement, false), 1, TimeUnit.SECONDS);
+ else
+ originator.complete(res);
+ });
+ }
+}
diff --git a/harry-integration/src/harry/runner/QueryingNoOpChecker.java b/harry-integration/src/harry/runner/QueryingNoOpChecker.java
new file mode 100644
index 0000000..08b8e6b
--- /dev/null
+++ b/harry-integration/src/harry/runner/QueryingNoOpChecker.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class QueryingNoOpChecker implements Model
+{
+ public static void init()
+ {
+ Configuration.registerSubtypes(QueryingNoOpCheckerConfig.class);
+ }
+
+ private final Run run;
+
+ public QueryingNoOpChecker(Run run)
+ {
+ this.run = run;
+ }
+
+ @Override
+ public void validate(Query query)
+ {
+ CompiledStatement compiled = query.toSelectStatement();
+ run.sut.execute(compiled.cql(),
+ SystemUnderTest.ConsistencyLevel.QUORUM,
+ compiled.bindings());
+ }
+
+ @JsonTypeName("querying_no_op_checker")
+ public static class QueryingNoOpCheckerConfig implements Configuration.ModelConfiguration
+ {
+ @JsonCreator
+ public QueryingNoOpCheckerConfig()
+ {
+ }
+
+ public Model make(Run run)
+ {
+ return new QueryingNoOpChecker(run);
+ }
+ }
+}
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
new file mode 100644
index 0000000..98beb93
--- /dev/null
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.data.ResultSetRow;
+import harry.model.Model;
+import harry.model.QuiescentChecker;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+import static harry.model.SelectHelper.resultSetToRow;
+
+public class RepairingLocalStateValidator extends AllPartitionsValidator
+{
+ public static void init()
+ {
+ Configuration.registerSubtypes(RepairingLocalStateValidatorConfiguration.class,
+ QuiescentCheckerConfig.class);
+ }
+
+ public final InJvmSut inJvmSut;
+
+ public RepairingLocalStateValidator(int concurrency, int triggerAfter, Run run, Model.ModelFactory modelFactory)
+ {
+ super(concurrency, triggerAfter, run, modelFactory);
+
+ this.inJvmSut = (InJvmSut) run.sut;
+ }
+
+ @Override
+ public void visitPartition(long lts)
+ {
+ if (lts > 0 && lts % triggerAfter == 0)
+ {
+ System.out.println("Starting repair...");
+ inJvmSut.cluster().get(1).nodetool("repair", "--full");
+ System.out.println("Validating partitions...");
+ validateAllPartitions(executor, concurrency);
+ }
+ }
+
+ @JsonTypeName("repair_and_validate_local_states")
+ public static class RepairingLocalStateValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+ {
+ private final int concurrency;
+ private final int trigger_after;
+ private final Configuration.ModelConfiguration modelConfiguration;
+
+ @JsonCreator
+ public RepairingLocalStateValidatorConfiguration(@JsonProperty("concurrency") int concurrency,
+ @JsonProperty("trigger_after") int trigger_after,
+ @JsonProperty("model") Configuration.ModelConfiguration model)
+ {
+ this.concurrency = concurrency;
+ this.trigger_after = trigger_after;
+ this.modelConfiguration = model;
+ }
+
+ public PartitionVisitor make(Run run)
+ {
+ return new RepairingLocalStateValidator(concurrency, trigger_after, run, modelConfiguration);
+ }
+ }
+
+ public static class QuiescentLocalStateChecker extends QuiescentChecker
+ {
+ public final InJvmSut inJvmSut;
+
+ public QuiescentLocalStateChecker(Run run)
+ {
+ super(run);
+ assert run.sut instanceof InJvmSut;
+
+ this.inJvmSut = (InJvmSut) run.sut;
+ }
+
+ @Override
+ public void validate(Query query)
+ {
+ CompiledStatement compiled = query.toSelectStatement();
+ for (int i = 1; i <= inJvmSut.cluster.size(); i++)
+ {
+ int node = i;
+ validate(() -> {
+ Object[][] objects = inJvmSut.execute(compiled.cql(),
+ SystemUnderTest.ConsistencyLevel.NODE_LOCAL,
+ node,
+ compiled.bindings());
+ List<ResultSetRow> result = new ArrayList<>();
+ for (Object[] obj : objects)
+ result.add(resultSetToRow(query.schemaSpec, clock, obj));
+
+ return result;
+ }, query);
+ }
+ }
+ }
+
+ @JsonTypeName("quiescent_local_state_checker")
+ public static class QuiescentCheckerConfig implements Configuration.ModelConfiguration
+ {
+ @JsonCreator
+ public QuiescentCheckerConfig()
+ {
+ }
+
+ public Model make(Run run)
+ {
+ return new QuiescentLocalStateChecker(run);
+ }
+ }
+}
diff --git a/harry-integration/src/harry/runner/Reproduce.java b/harry-integration/src/harry/runner/Reproduce.java
index 049c864..78a96f6 100644
--- a/harry-integration/src/harry/runner/Reproduce.java
+++ b/harry-integration/src/harry/runner/Reproduce.java
@@ -33,7 +33,7 @@ public class Reproduce extends TestBaseImpl
public void runWithInJvmDtest() throws Throwable
{
- InJvmSut.registerSubtypes();
+ InJvmSut.init();
System.setProperty("cassandra.disable_tcactive_openssl", "true");
System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
@@ -46,7 +46,7 @@ public class Reproduce extends TestBaseImpl
try
{
- run.validator.validatePartition(0L);
+// run.validator.validatePartition(0L);
}
catch(Throwable t)
{
diff --git a/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java b/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
index 4a38e7b..ecbf499 100644
--- a/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
@@ -19,10 +19,9 @@
package harry.model;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import harry.core.Configuration;
@@ -33,34 +32,40 @@ import harry.corruptor.HideRowCorruptor;
import harry.corruptor.HideValueCorruptor;
import harry.corruptor.QueryResponseCorruptor;
import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
+import harry.ddl.SchemaGenerators;
+import harry.model.sut.InJvmSut;
import harry.model.sut.SystemUnderTest;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
-import harry.runner.Validator;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import harry.runner.SinglePartitionValidator;
-public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
+public class ExhaustiveCheckerIntegrationTest extends ModelTestBase
{
@Test
public void testVerifyPartitionState()
{
- Configuration config = sharedConfiguration(1).build();
- Run run = config.createRun();
- run.sut.schemaChange(run.schemaSpec.compile().cql());
+ Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
- OpSelectors.MonotonicClock clock = run.clock;
- Validator validator = run.validator;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
- for (int i = 0; i < 2000; i++)
+ for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
{
- long lts = clock.nextLts();
- partitionVisitor.visitPartition(lts);
- }
+ Configuration config = gen.get().build();
+ Run run = config.createRun();
+ run.sut.schemaChange(run.schemaSpec.compile().cql());
+ OpSelectors.MonotonicClock clock = run.clock;
- validator.validatePartition(0);
+ SinglePartitionValidator validator = new SinglePartitionValidator(100, run, modelConfiguration());
+ PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+
+ for (int i = 0; i < 2000; i++)
+ {
+ long lts = clock.nextLts();
+ partitionVisitor.visitPartition(lts);
+ }
+
+ validator.visitPartition(0);
+ }
}
@Test
@@ -71,13 +76,13 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
run.clock,
HideRowCorruptor::new);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
- t.getCause() != null &&t.getCause().toString().contains(OpSelectors.OperationKind.WRITE.toString())));
+ (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+ t.getCause() != null && t.getCause().toString().contains(OpSelectors.OperationKind.WRITE.toString())));
}
@Test
@@ -88,18 +93,21 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
run.clock,
run.descriptorSelector);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> {
+ (t, run) -> {
Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
- // TODO: this is not entirely correct. Right now, after registering a deletion followed by no writes,
- // we would continue going back in time and checking other operations, even though we don't have to do this.
- t.getCause().getMessage().contains("Modification should have been visible but was not") ||
- t.getCause().getMessage().contains("Observed unvalidated rows") ||
- t.getCause() != null && t.getCause().getMessage().contains("was never written"));
+ // TODO: this is not entirely correct. Right now, after registering a deletion followed by no writes,
+ // we would continue going back in time and checking other operations, even though we don't have to do this.
+ t.getCause().getMessage().contains("Modification should have been visible but was not") ||
+ // TODO: this is not entirely correct, either. This row is, in fact, present in both dataset _and_
+ // in the model, it's just there might be _another_ row right in front of it.
+ t.getCause().getMessage().contains("expected row not to be visible") ||
+ t.getCause().getMessage().contains("Observed unvalidated rows") ||
+ t.getCause() != null && t.getCause().getMessage().contains("was never written"));
});
}
@@ -112,18 +120,17 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
run.clock,
HideValueCorruptor::new);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
- t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not")));
+ (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+ t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not")));
}
@Test
- @Ignore
public void testDetectsOverwrittenRow()
{
negativeTest((run) -> {
@@ -131,89 +138,61 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
run.clock,
ChangeValueCorruptor::new);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
- t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not.")));
+ (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+ t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not.")));
}
-
- static void negativeTest(Consumer<Run> corrupt, Consumer<Throwable> validate)
+ @Test
+ public void testLocalOnlyExecution()
{
- Configuration config = sharedConfiguration()
- .setClusteringDescriptorSelector((rng, schemaSpec) -> {
- return new OpSelectors.DefaultDescriptorSelector(rng,
- new OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec.regularColumns.size()).build(),
- Surjections.pick(OpSelectors.OperationKind.DELETE_COLUMN,
- OpSelectors.OperationKind.DELETE_ROW,
- OpSelectors.OperationKind.WRITE),
- new Distribution.ConstantDistribution(10),
- new Distribution.ConstantDistribution(10),
- 100);
- })
- .build();
- Run run = config.createRun();
- run.sut.schemaChange(run.schemaSpec.compile().cql());
- OpSelectors.MonotonicClock clock = run.clock;
- Validator validator = run.validator;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
- for (int i = 0; i < 200; i++)
- {
- long lts = clock.nextLts();
- partitionVisitor.visitPartition(lts);
- }
+ LocalOnlySut localOnlySut = new LocalOnlySut();
- corrupt.accept(run);
+ Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
- try
- {
- validator.validatePartition(0);
- Assert.fail("Should've thrown");
- }
- catch (Throwable t)
+ for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
{
- validate.accept(t);
+ Configuration config = gen.get()
+ .setClusteringDescriptorSelector((builder) -> {
+ builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+ .addWeight(OpSelectors.OperationKind.DELETE_ROW, 80)
+ .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
+ .addWeight(OpSelectors.OperationKind.WRITE, 10)
+ .build());
+ })
+ .setSUT(() -> localOnlySut)
+ .build();
+
+ Run run = config.createRun();
+ run.sut.schemaChange(run.schemaSpec.compile().cql());
+
+ OpSelectors.MonotonicClock clock = run.clock;
+
+ PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+
+ localOnlySut.localOnly(() -> {
+ for (int i = 0; i < 5; i++)
+ {
+ long lts = clock.nextLts();
+ partitionVisitor.visitPartition(lts);
+ }
+ });
+
+ SinglePartitionValidator validator = new SinglePartitionValidator(100, run, ExhaustiveChecker::new);
+ validator.visitPartition(0);
}
}
- @Test
- public void testLocalOnlyExecution()
+ Configuration.ModelConfiguration modelConfiguration()
{
- LocalOnlySut localOnlySut = new LocalOnlySut();
- Configuration config = sharedConfiguration()
- .setClusteringDescriptorSelector((builder) -> {
- builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
- .addWeight(OpSelectors.OperationKind.DELETE_ROW, 80)
- .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
- .addWeight(OpSelectors.OperationKind.WRITE, 10)
- .build());
- })
- .setSUT(() -> localOnlySut)
- .build();
-
- Run run = config.createRun();
- run.sut.schemaChange(run.schemaSpec.compile().cql());
-
- OpSelectors.MonotonicClock clock = run.clock;
- Validator validator = run.validator;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
- localOnlySut.localOnly(() -> {
- for (int i = 0; i < 5; i++)
- {
- long lts = clock.nextLts();
- partitionVisitor.visitPartition(lts);
- }
- });
-
- validator.validatePartition(0);
+ return new Configuration.ExhaustiveCheckerConfig();
}
- private static class LocalOnlySut implements SystemUnderTest
+ public static class LocalOnlySut implements SystemUnderTest
{
private boolean localOnly = false;
private int counter = 0;
@@ -233,17 +212,17 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
cluster.schemaChange(statement);
}
- public Object[][] execute(String statement, Object... bindings)
+ public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
{
if (localOnly)
return cluster.get((counter++) % cluster.size() + 1).executeInternal(statement, bindings);
else
- return cluster.coordinator((counter++) % cluster.size() + 1).execute(statement, ConsistencyLevel.ALL, bindings);
+ return cluster.coordinator((counter++) % cluster.size() + 1).execute(statement, InJvmSut.toApiCl(ConsistencyLevel.ALL), bindings);
}
- public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+ public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
{
- return CompletableFuture.supplyAsync(() -> execute(statement, bindings));
+ return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings));
}
public void localOnly(Runnable r)
diff --git a/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java b/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
index 84d5a82..7c1ca04 100644
--- a/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
+++ b/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
@@ -26,13 +26,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
import harry.core.Configuration;
import harry.core.Run;
-import harry.model.sut.NoOpSut;
+import harry.ddl.SchemaGenerators;
+import harry.model.sut.SystemUnderTest;
import harry.operations.CompiledStatement;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
@@ -43,55 +45,58 @@ public class ExhaustiveCheckerUnitTest
@Test
public void testOperationConsistency()
{
- LoggingRowVisitor rowVisitor = new LoggingRowVisitor();
-
- Configuration config = IntegrationTestBase.sharedConfiguration()
- .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(10, 10))
- .setClusteringDescriptorSelector((builder) -> {
- builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
- .addWeight(OpSelectors.OperationKind.DELETE_ROW, 33)
- .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 33)
- .addWeight(OpSelectors.OperationKind.WRITE, 34)
- .build());
- })
- .setRowVisitor((a_, b_, c_, d_) -> rowVisitor)
- .setSUT(NoOpSut::new)
- .setCreateSchema(true)
- .build();
-
- Run run = config.createRun();
-
- ExhaustiveChecker checker = (ExhaustiveChecker) run.model;
-
- int iterations = 10;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
- for (int i = 0; i < iterations; i++)
- partitionVisitor.visitPartition(i);
-
- for (List<ExhaustiveChecker.Operation> value : rowVisitor.executed.values())
- Collections.sort(value);
-
- for (int lts = 0; lts < iterations; lts++)
+ Supplier<Configuration.ConfigurationBuilder> gen = IntegrationTestBase.sharedConfiguration();
+
+ for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
{
- // TODO: turn query into interface to make it easier to deal with here
- for (Collection<ExhaustiveChecker.Operation> modelOps : checker.inflatePartitionState(lts,
- (iterations - 1),
- Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(lts),
- false))
- .operations.values())
+ LoggingRowVisitor rowVisitor = new LoggingRowVisitor();
+ Configuration config = gen.get()
+ .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(10, 10))
+ .setClusteringDescriptorSelector((builder) -> {
+ builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+ .addWeight(OpSelectors.OperationKind.DELETE_ROW, 33)
+ .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 33)
+ .addWeight(OpSelectors.OperationKind.WRITE, 34)
+ .build());
+ })
+ .setSUT(() -> SystemUnderTest.NO_OP)
+ .setCreateSchema(true)
+ .build();
+
+ Run run = config.createRun();
+
+ ExhaustiveChecker checker = new ExhaustiveChecker(run);
+
+ PartitionVisitor visitor = new Configuration.MutatingPartitionVisitorConfiguation((r) -> rowVisitor).make(run);
+ int iterations = 10;
+
+ for (int i = 0; i < iterations; i++)
+ visitor.visitPartition(i);
+
+ for (List<ExhaustiveChecker.Operation> value : rowVisitor.executed.values())
+ Collections.sort(value);
+
+ for (int lts = 0; lts < iterations; lts++)
{
- ExhaustiveChecker.Operation op = modelOps.iterator().next();
- List<ExhaustiveChecker.Operation> executedOps = rowVisitor.executed.get(new Pair(op.pd, op.cd));
- Iterator<ExhaustiveChecker.Operation> modelIterator = modelOps.iterator();
- Iterator<ExhaustiveChecker.Operation> executedIterator = executedOps.iterator();
- while (modelIterator.hasNext() && executedIterator.hasNext())
+ // TODO: turn query into interface to make it easier to deal with here
+ for (Collection<ExhaustiveChecker.Operation> modelOps : checker.inflatePartitionState(iterations - 1,
+ Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(lts),
+ false))
+ .operations.values())
{
+ ExhaustiveChecker.Operation op = modelOps.iterator().next();
+ List<ExhaustiveChecker.Operation> executedOps = rowVisitor.executed.get(new Pair(op.pd, op.cd));
+ Iterator<ExhaustiveChecker.Operation> modelIterator = modelOps.iterator();
+ Iterator<ExhaustiveChecker.Operation> executedIterator = executedOps.iterator();
+ while (modelIterator.hasNext() && executedIterator.hasNext())
+ {
+ Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
+ executedIterator.next(), modelIterator.next());
+ }
Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
- executedIterator.next(), modelIterator.next());
+ modelIterator.hasNext(), executedIterator.hasNext());
}
- Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
- modelIterator.hasNext(), executedIterator.hasNext());
}
}
}
@@ -131,6 +136,11 @@ public class ExhaustiveCheckerUnitTest
{
return null;
}
+
+ public CompiledStatement deleteSlice(long lts, long pd, long opId)
+ {
+ return null;
+ }
}
private static class Pair
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index 08d9f46..a3c2dfa 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -18,13 +18,14 @@
package harry.model;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import harry.core.Configuration;
+import harry.ddl.SchemaGenerators;
import harry.ddl.SchemaSpec;
import harry.model.clock.OffsetClock;
import harry.model.sut.InJvmSut;
@@ -57,22 +58,16 @@ public class IntegrationTestBase extends TestBaseImpl
cluster.schemaChange("CREATE KEYSPACE harry WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
}
- // TODO: run these tests with like a hundred different tables?
- static Configuration.ConfigurationBuilder sharedConfiguration()
+ private static long seed = 0;
+ public static Supplier<Configuration.ConfigurationBuilder> sharedConfiguration()
{
- return sharedConfiguration(1);
+ Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+ return () -> {
+ SchemaSpec schemaSpec = specGenerator.get();
+ return sharedConfiguration(seed, schemaSpec);
+ };
}
- private static AtomicInteger COUNTER = new AtomicInteger();
-
- public static Configuration.ConfigurationBuilder sharedConfiguration(long seed)
- {
- int i = COUNTER.incrementAndGet();
- SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + i, seed);
- return sharedConfiguration(seed, schemaSpec);
- }
-
-
public static Configuration.ConfigurationBuilder sharedConfiguration(long seed, SchemaSpec schema)
{
return new Configuration.ConfigurationBuilder().setSeed(seed)
@@ -80,7 +75,6 @@ public class IntegrationTestBase extends TestBaseImpl
.setCreateSchema(true)
.setTruncateTable(false)
.setDropSchema(true)
- .setModel(new Configuration.ExhaustiveCheckerConfig())
.setSchemaProvider(seed1 -> schema)
.setClusteringDescriptorSelector((builder) -> {
builder
@@ -91,9 +85,9 @@ public class IntegrationTestBase extends TestBaseImpl
.addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
.addWeight(OpSelectors.OperationKind.WRITE, 80)
.build())
- .setMaxPartitionSize(1);
+ .setMaxPartitionSize(100);
})
.setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, 200))
.setSUT(() -> sut);
}
-}
\ No newline at end of file
+}
diff --git a/harry-integration/test/harry/model/ModelTest.java b/harry-integration/test/harry/model/ModelTest.java
index ae3d7b7..7a7cfe9 100644
--- a/harry-integration/test/harry/model/ModelTest.java
+++ b/harry-integration/test/harry/model/ModelTest.java
@@ -18,8 +18,10 @@
package harry.model;
+import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Test;
@@ -29,6 +31,7 @@ import harry.ddl.SchemaSpec;
import harry.generators.Surjections;
import harry.generators.distribution.Distribution;
import harry.model.sut.InJvmSut;
+import harry.runner.LoggingPartitionVisitor;
import harry.runner.Runner;
import harry.util.BitSet;
import org.apache.cassandra.distributed.Cluster;
@@ -80,7 +83,10 @@ public class ModelTest extends TestBaseImpl
@Test
public void statefulVisibleRowsCheckerTest() throws Throwable
{
- visibleRowsCheckerTest(VisibleRowsChecker::new);
+ visibleRowsCheckerTest(VisibleRowsChecker::new,
+ (cfg) -> {
+ cfg.setDataTracker(VisibleRowsChecker.LoggingDataTracker::new);
+ });
}
@Test
@@ -91,13 +97,20 @@ public class ModelTest extends TestBaseImpl
public void visibleRowsCheckerTest(Model.ModelFactory factory) throws Throwable
{
+ visibleRowsCheckerTest(factory, (a) -> {});
+ }
+ public void visibleRowsCheckerTest(Model.ModelFactory factory, Consumer<Configuration.ConfigurationBuilder> configurator) throws Throwable
+ {
try (Cluster cluster = Cluster.create(3))
{
- Configuration.ConfigurationBuilder configuration = new Configuration.ConfigurationBuilder();
- configuration.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(2),
+ Configuration.ConfigurationBuilder builder = new Configuration.ConfigurationBuilder();
+ builder.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(2),
1, TimeUnit.SECONDS))
.setRunTime(1, TimeUnit.MINUTES)
- .setRunner(new Configuration.ConcurrentRunnerConfig(2, 2, 2))
+ .setRunner(new Configuration.ConcurrentRunnerConfig(2,
+ Arrays.asList(new Configuration.LoggingPartitionVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+ new Configuration.RecentPartitionsValidatorConfiguration(10, 10, factory::make),
+ new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))))
.setSchemaProvider((seed) -> schema)
.setClusteringDescriptorSelector((OpSelectors.Rng rng, SchemaSpec schemaSpec) -> {
return new DescriptorSelectorBuilder()
@@ -109,10 +122,9 @@ public class ModelTest extends TestBaseImpl
.setCreateSchema(true)
.setTruncateTable(false)
.setDropSchema(false)
- .setModel(factory::create)
.setSUT(() -> new InJvmSut(cluster));
-
- Runner runner = configuration.build().createRunner();
+ configurator.accept(builder);
+ Runner runner = builder.build().createRunner();
try
{
runner.initAndStartAll().get(2, TimeUnit.MINUTES);
@@ -144,4 +156,4 @@ public class ModelTest extends TestBaseImpl
}
}
-// TODO: test things gradually. First with simple schemas, then with more complex, then with completely random.
\ No newline at end of file
+// TODO: test things gradually. First with simple schemas, then with more complex, then with completely random.
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
new file mode 100644
index 0000000..6421355
--- /dev/null
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.model;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaGenerators;
+import harry.ddl.SchemaSpec;
+import harry.runner.LoggingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
+import harry.runner.PartitionVisitor;
+import harry.runner.SinglePartitionValidator;
+
+public abstract class ModelTestBase extends IntegrationTestBase
+{
+ void negativeTest(Function<Run, Boolean> corrupt, BiConsumer<Throwable, Run> validate)
+ {
+ Supplier<SchemaSpec> supplier = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+ for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
+ {
+ SchemaSpec schema = supplier.get();
+ negativeTest(corrupt, validate, i, schema);
+ }
+ }
+
+ abstract Configuration.ModelConfiguration modelConfiguration();
+
+ protected PartitionVisitor validator(Run run)
+ {
+ return new SinglePartitionValidator(100, run, modelConfiguration());
+ }
+
+ void negativeTest(Function<Run, Boolean> corrupt, BiConsumer<Throwable, Run> validate, int counter, SchemaSpec schemaSpec)
+ {
+ Configuration config = sharedConfiguration(counter, schemaSpec)
+ .setClusteringDescriptorSelector((builder) -> {
+ builder.setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(10))
+ .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(10))
+ .setMaxPartitionSize(100)
+ .setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+ .addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_RANGE, 1)
+ .addWeight(OpSelectors.OperationKind.DELETE_SLICE, 1)
+ .addWeight(OpSelectors.OperationKind.WRITE, 96)
+ .build());
+ })
+ .build();
+
+ Run run = config.createRun();
+ beforeEach();
+ run.sut.schemaChange(run.schemaSpec.compile().cql());
+ OpSelectors.MonotonicClock clock = run.clock;
+
+ PartitionVisitor validator = validator(run);
+ PartitionVisitor partitionVisitor = new LoggingPartitionVisitor(run, MutatingRowVisitor::new);
+
+ for (int i = 0; i < 200; i++)
+ {
+ long lts = clock.nextLts();
+ partitionVisitor.visitPartition(lts);
+ }
+
+ validator.visitPartition(0);
+
+ if (!corrupt.apply(run))
+ {
+ System.out.println("Could not corrupt");
+ return;
+ }
+
+ try
+ {
+ validator.visitPartition(0);
+ }
+ catch (Throwable t)
+ {
+ validate.accept(t, run);
+ }
+ }
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 2520089..8085a06 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
@@ -37,17 +38,20 @@ import harry.corruptor.HideRowCorruptor;
import harry.corruptor.HideValueCorruptor;
import harry.corruptor.QueryResponseCorruptor;
import harry.corruptor.ShowValueCorruptor;
+import harry.ddl.SchemaGenerators;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
@RunWith(Parameterized.class)
public class QuerySelectorNegativeTest extends IntegrationTestBase
{
- private final int rounds = 10;
private final int ltss = 1000;
+
private final Random rnd = new Random();
private final QueryResponseCorruptorFactory corruptorFactory;
@@ -86,20 +90,26 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
public void selectRows()
{
Map<Query.QueryKind, Integer> stats = new HashMap<>();
+ Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
+
+ int rounds = SchemaGenerators.DEFAULT_RUNS;
int failureCounter = 0;
outer:
for (int counter = 0; counter < rounds; counter++)
{
beforeEach();
- Configuration config = sharedConfiguration(counter)
- .setClusteringDescriptorSelector((builder) -> {
- builder.setMaxPartitionSize(2000);
- })
- .build();
+ Configuration config = gen.get()
+ .setClusteringDescriptorSelector((builder) -> {
+ builder.setMaxPartitionSize(2000);
+ })
+ .build();
Run run = config.createRun();
run.sut.schemaChange(run.schemaSpec.compile().cql());
+
OpSelectors.MonotonicClock clock = run.clock;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
+
+ PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+ Model model = new ExhaustiveChecker(run);
QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
@@ -112,20 +122,23 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
while (true)
{
long verificationLts = rnd.nextInt(1000);
- QuerySelector querySelector = new QuerySelector(run.schemaSpec,
- run.pdSelector,
- run.descriptorSelector,
- run.rng);
+ QueryGenerator queryGen = new QueryGenerator(run.schemaSpec,
+ run.pdSelector,
+ run.descriptorSelector,
+ run.rng);
+
+ QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run.rng, queryGen);
Query query = querySelector.inflate(verificationLts, counter);
- run.model.validatePartitionState(verificationLts, query);
+
+ model.validate(query);
if (!corruptor.maybeCorrupt(query, run.sut))
continue;
try
{
- run.model.validatePartitionState(verificationLts, query);
+ model.validate(query);
Assert.fail("Should've failed");
}
catch (Throwable t)
@@ -141,4 +154,4 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
Assert.assertTrue(String.format("Seen only %d failures", failureCounter),
failureCounter > (rounds * 0.8));
}
-}
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 0859181..3e7f3f0 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -20,29 +20,37 @@ package harry.model;
import java.util.HashSet;
import java.util.Set;
+import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
import harry.core.Configuration;
import harry.core.Run;
+import harry.ddl.SchemaGenerators;
import harry.ddl.SchemaSpec;
+import harry.model.sut.SystemUnderTest;
import harry.operations.CompiledStatement;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
public class QuerySelectorTest extends IntegrationTestBase
{
- private static int CYCLES = 100;
+ private static int CYCLES = 300;
@Test
public void basicQuerySelectorTest()
{
- for (int cnt = 0; cnt < 50; cnt++)
+ Supplier<SchemaSpec> schemaGen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+ for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
{
- SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + cnt, cnt);
+ beforeEach();
+ SchemaSpec schemaSpec = schemaGen.get();
int partitionSize = 200;
+
int[] fractions = new int[schemaSpec.clusteringKeys.size()];
int last = partitionSize;
for (int i = fractions.length - 1; i >= 0; i--)
@@ -60,7 +68,8 @@ public class QuerySelectorTest extends IntegrationTestBase
Run run = config.createRun();
run.sut.schemaChange(run.schemaSpec.compile().cql());
OpSelectors.MonotonicClock clock = run.clock;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
+
+ PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
for (int i = 0; i < CYCLES; i++)
{
@@ -68,15 +77,13 @@ public class QuerySelectorTest extends IntegrationTestBase
partitionVisitor.visitPartition(lts);
}
- QuerySelector querySelector = new QuerySelector(run.schemaSpec,
- run.pdSelector,
- run.descriptorSelector,
- run.rng);
+ QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
for (int i = 0; i < CYCLES; i++)
{
Query query = querySelector.inflate(i, i);
- Object[][] results = run.sut.execute(query.toSelectStatement());
+
+ Object[][] results = run.sut.execute(query.toSelectStatement(), SystemUnderTest.ConsistencyLevel.QUORUM);
Set<Long> matchingClusterings = new HashSet<>();
for (Object[] row : results)
{
@@ -89,7 +96,7 @@ public class QuerySelectorTest extends IntegrationTestBase
// the simplest test there can be: every row that is in the partition and was returned by the query,
// has to "match", every other row has to be a non-match
CompiledStatement selectPartition = SelectHelper.select(run.schemaSpec, run.pdSelector.pd(i));
- Object[][] partition = run.sut.execute(selectPartition);
+ Object[][] partition = run.sut.execute(selectPartition, SystemUnderTest.ConsistencyLevel.QUORUM);
for (Object[] row : partition)
{
long cd = SelectHelper.resultSetToRow(run.schemaSpec,
@@ -106,9 +113,10 @@ public class QuerySelectorTest extends IntegrationTestBase
@Test
public void querySelectorModelTest()
{
- for (int cnt = 0; cnt < 50; cnt++)
+ Supplier<SchemaSpec> gen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+ for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
{
- SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + cnt, cnt);
+ SchemaSpec schemaSpec = gen.get();
int[] fractions = new int[schemaSpec.clusteringKeys.size()];
int partitionSize = 200;
int last = partitionSize;
@@ -127,7 +135,7 @@ public class QuerySelectorTest extends IntegrationTestBase
Run run = config.createRun();
run.sut.schemaChange(run.schemaSpec.compile().cql());
OpSelectors.MonotonicClock clock = run.clock;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
+ PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
for (int i = 0; i < CYCLES; i++)
{
@@ -135,18 +143,14 @@ public class QuerySelectorTest extends IntegrationTestBase
partitionVisitor.visitPartition(lts);
}
- QuerySelector querySelector = new QuerySelector(run.schemaSpec,
- run.pdSelector,
- run.descriptorSelector,
- run.rng);
-
- Model model = run.model;
+ QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
+ Model model = new ExhaustiveChecker(run);
long verificationLts = 10;
for (int i = 0; i < CYCLES; i++)
{
Query query = querySelector.inflate(verificationLts, i);
- model.validatePartitionState(verificationLts, query);
+ model.validate(query);
}
}
}
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index c64f05d..22d7171 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -18,8 +18,6 @@
package harry.model;
-import java.util.function.Consumer;
-
import org.junit.Assert;
import org.junit.Test;
@@ -31,19 +29,28 @@ import harry.corruptor.HideRowCorruptor;
import harry.corruptor.HideValueCorruptor;
import harry.corruptor.QueryResponseCorruptor;
import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
import harry.runner.PartitionVisitor;
import harry.runner.Query;
-import harry.runner.Validator;
+import harry.runner.SinglePartitionValidator;
-public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
+public class QuiescentCheckerIntegrationTest extends ModelTestBase
{
+ @Override
+ protected PartitionVisitor validator(Run run)
+ {
+ return new SinglePartitionValidator(100, run, modelConfiguration());
+ }
+
@Test
public void testNormalCondition()
{
- negativeTest((run) -> {},
- Assert::assertNull);
+ negativeTest((run) -> { return true; },
+ (t, run) -> {
+ if (t != null)
+ throw new AssertionError(String.format("Throwable was supposed to be null. Schema: %s",
+ run.schemaSpec.compile().cql()),
+ t);
+ });
}
@Test
@@ -54,15 +61,18 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
run.clock,
HideRowCorruptor::new);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ Query query = Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false);
+
+ return corruptor.maybeCorrupt(query, run.sut);
},
- (t) -> {
+ (t, run) -> {
+ // TODO: We can actually pinpoint the difference
String expected = "Expected results to have the same number of results, but expected result iterator has more results";
+ String expected2 = "Found a row in the model that is not present in the resultset";
Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
- t.getMessage().contains(expected));
+ t.getMessage().contains(expected) || t.getMessage().contains(expected2));
});
}
@@ -74,15 +84,17 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
run.clock,
run.descriptorSelector);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> {
+ (t, run) -> {
String expected = "Found a row in the model that is not present in the resultset";
+ String expected2 = "Expected results to have the same number of results, but actual result iterator has more results";
+
Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
- t.getMessage().contains(expected));
+ t.getMessage().contains(expected) || t.getMessage().contains(expected2));
});
}
@@ -95,12 +107,12 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
run.clock,
HideValueCorruptor::new);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> {
+ (t, run) -> {
String expected = "Returned row state doesn't match the one predicted by the model";
Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
t.getMessage().contains(expected));
@@ -116,55 +128,20 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
run.clock,
ChangeValueCorruptor::new);
- Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
- run.pdSelector.pd(0, run.schemaSpec),
- false),
- run.sut));
+ return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+ run.pdSelector.pd(0, run.schemaSpec),
+ false),
+ run.sut);
},
- (t) -> {
+ (t, run) -> {
String expected = "Returned row state doesn't match the one predicted by the model";
Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
t.getMessage().contains(expected));
});
}
-
- static void negativeTest(Consumer<Run> corrupt, Consumer<Throwable> validate)
+ Configuration.ModelConfiguration modelConfiguration()
{
- Configuration config = sharedConfiguration()
- .setModel(new Configuration.QuiescentCheckerConfig())
- .setClusteringDescriptorSelector((rng, schemaSpec) -> {
- return new OpSelectors.DefaultDescriptorSelector(rng,
- new OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec.regularColumns.size()).build(),
- Surjections.pick(OpSelectors.OperationKind.DELETE_COLUMN,
- OpSelectors.OperationKind.DELETE_ROW,
- OpSelectors.OperationKind.WRITE),
- new Distribution.ConstantDistribution(10),
- new Distribution.ConstantDistribution(10),
- 100);
- })
- .build();
- Run run = config.createRun();
- run.sut.schemaChange(run.schemaSpec.compile().cql());
- OpSelectors.MonotonicClock clock = run.clock;
- Validator validator = run.validator;
- PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
- for (int i = 0; i < 200; i++)
- {
- long lts = clock.nextLts();
- partitionVisitor.visitPartition(lts);
- }
-
- corrupt.accept(run);
-
- try
- {
- validator.validatePartition(0);
- }
- catch (Throwable t)
- {
- validate.accept(t);
- }
+ return new Configuration.QuiescentCheckerConfig();
}
}
\ No newline at end of file
diff --git a/harry-integration/test/harry/op/RowVisitorTest.java b/harry-integration/test/harry/op/RowVisitorTest.java
index e58c05b..c8ad182 100644
--- a/harry-integration/test/harry/op/RowVisitorTest.java
+++ b/harry-integration/test/harry/op/RowVisitorTest.java
@@ -24,15 +24,19 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import harry.core.MetricReporter;
+import harry.core.Run;
import harry.ddl.SchemaGenerators;
import harry.ddl.SchemaSpec;
import harry.generators.RandomGenerator;
import harry.generators.distribution.Distribution;
-import harry.runner.DefaultRowVisitor;
+import harry.model.sut.SystemUnderTest;
+import harry.runner.DataTracker;
+import harry.runner.MutatingRowVisitor;
import harry.model.clock.OffsetClock;
import harry.model.OpSelectors;
import harry.operations.CompiledStatement;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
import harry.util.BitSet;
import org.apache.cassandra.cql3.CQLTester;
@@ -50,32 +54,37 @@ public class RowVisitorTest extends CQLTester
@Test
public void rowWriteGeneratorTest()
{
- Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(5);
+ Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
RandomGenerator rand = RandomGenerator.forTests(6371747244598697093L);
- OpSelectors.Rng opRng = new OpSelectors.PCGFast(1);
- OpSelectors.DefaultDescriptorSelector descriptorSelector = new OpSelectors.DefaultDescriptorSelector(new OpSelectors.PCGFast(1L),
- OpSelectors.columnSelectorBuilder().forAll(BitSet.create(0b001, 3),
- BitSet.create(0b011, 3),
- BitSet.create(0b111, 3))
- .build(),
- DEFAULT_OP_TYPE_SELECTOR,
- new Distribution.ScaledDistribution(1, 3),
- new Distribution.ScaledDistribution(2, 30),
- 100);
+ OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
- for (int i = 0; i < SchemaGenerators.PROGRESSIVE_GENERATORS.length * 5; i++)
+ OpSelectors.PdSelector pdSelector = new OpSelectors.DefaultPdSelector(rng, 10, 10);
+ OpSelectors.DescriptorSelector descriptorSelector = new OpSelectors.DefaultDescriptorSelector(rng,
+ OpSelectors.columnSelectorBuilder().forAll(BitSet.create(0b001, 3),
+ BitSet.create(0b011, 3),
+ BitSet.create(0b111, 3))
+ .build(),
+ DEFAULT_OP_TYPE_SELECTOR,
+ new Distribution.ScaledDistribution(1, 3),
+ new Distribution.ScaledDistribution(2, 30),
+ 100);
+
+ for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
{
SchemaSpec schema = specGenerator.get();
createTable(schema.compile().cql());
- DefaultRowVisitor visitor = new DefaultRowVisitor(schema,
- new OffsetClock(10000),
- descriptorSelector,
- new QuerySelector(schema,
- new OpSelectors.DefaultPdSelector(opRng, 10, 10),
- descriptorSelector,
- opRng));
+ Run run = new Run(rng,
+ new OffsetClock(10000),
+ pdSelector,
+ descriptorSelector,
+ schema,
+ DataTracker.NO_OP,
+ SystemUnderTest.NO_OP,
+ MetricReporter.NO_OP);
+
+ MutatingRowVisitor visitor = new MutatingRowVisitor(run);
long[] descriptors = rand.next(4);
execute(visitor.write(Math.abs(descriptors[0]),
diff --git a/pom.xml b/pom.xml
index efb1cf2..8a172eb 100755
--- a/pom.xml
+++ b/pom.xml
@@ -38,9 +38,9 @@
<properties>
<javac.target>1.8</javac.target>
<harry.version>0.0.1-SNAPSHOT</harry.version>
- <cassandra.version>4.0.1-SNAPSHOT</cassandra.version>
+ <cassandra.version>4.0.0-SNAPSHOT</cassandra.version>
<jackson.version>2.11.3</jackson.version>
- <dtest.version>0.0.6</dtest.version>
+ <dtest.version>0.0.7</dtest.version>
<jmh.version>1.11.3</jmh.version>
<argLine.common>
-server
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org