You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2022/01/10 10:46:14 UTC
[cassandra-harry] 01/01: Features: * Implement lookbehind via tracker callbacks * Improve DSL * Rename maxLts to peek * Split lts visitors from visitors * Allow create table if not exists * Allow sampler to be triggered at every LTS * Allow local state validator to always run * Add Staged Runner * Add wait for token ranges * Make keyspace DDL configurable * Rename PartitionVisitor to Visitor
This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git
commit 275f188660b66743bf3f055c8d7da438ad826061
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Thu Nov 25 08:45:00 2021 +0100
Features:
* Implement lookbehind via tracker callbacks
* Improve DSL
* Rename maxLts to peek
* Split lts visitors from visitors
* Allow create table if not exists
* Allow sampler to be triggered at every LTS
* Allow local state validator to always run
* Add Staged Runner
* Add wait for token ranges
* Make keyspace DDL configurable
* Rename PartitionVisitor to Visitor
Bugfixes:
* Fix for queue draining
* Fix distribution of the single-op values
* Fix bug in schema helper: static columns are listed as duplicates
Patch by Alex Petrov for CASSANDRA-16262
Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
harry-core/src/harry/core/Configuration.java | 145 ++++----
harry-core/src/harry/core/Run.java | 2 -
.../src/harry/corruptor/HideRowCorruptor.java | 2 +-
.../src/harry/corruptor/HideValueCorruptor.java | 4 +-
.../src/harry/corruptor/ShowValueCorruptor.java | 2 +-
harry-core/src/harry/ddl/ColumnSpec.java | 16 +
harry-core/src/harry/ddl/SchemaSpec.java | 5 +-
harry-core/src/harry/dsl/HistoryBuilder.java | 24 +-
harry-core/src/harry/generators/RngUtils.java | 16 +
harry-core/src/harry/model/OpSelectors.java | 38 +-
harry-core/src/harry/model/QuiescentChecker.java | 38 +-
.../model/clock/ApproximateMonotonicClock.java | 16 +-
harry-core/src/harry/model/clock/OffsetClock.java | 9 +-
harry-core/src/harry/reconciler/Reconciler.java | 27 +-
harry-core/src/harry/runner/DataTracker.java | 54 ++-
.../src/harry/runner/DefaultDataTracker.java | 17 +-
harry-core/src/harry/runner/HarryRunner.java | 13 +-
harry-core/src/harry/runner/Runner.java | 406 ++++++++++++++-------
harry-core/src/harry/runner/StagedRunner.java | 201 ++++++++++
harry-core/src/harry/runner/UpToLtsRunner.java | 105 ++++++
harry-core/src/harry/util/ByteUtils.java | 196 ++++++++++
.../src/harry/visitors/AllPartitionsValidator.java | 38 +-
.../src/harry/visitors/CorruptingVisitor.java | 7 +-
.../src/harry/visitors/DelegatingVisitor.java | 61 ----
.../src/harry/visitors/GeneratingVisitor.java | 9 +-
harry-core/src/harry/visitors/LoggingVisitor.java | 1 -
harry-core/src/harry/visitors/LtsVisitor.java | 97 +++++
harry-core/src/harry/visitors/MutatingVisitor.java | 24 +-
.../src/harry/visitors/ParallelValidator.java | 4 +-
harry-core/src/harry/visitors/RecentValidator.java | 48 ++-
.../src/harry/visitors/ReplayingVisitor.java | 11 +-
harry-core/src/harry/visitors/Sampler.java | 11 +-
harry-core/src/harry/visitors/SingleValidator.java | 6 +-
harry-core/src/harry/visitors/VisitExecutor.java | 14 +-
harry-core/src/harry/visitors/Visitor.java | 8 +-
harry-core/test/harry/model/OpSelectorsTest.java | 148 ++++----
.../harry/runner/external/HarryRunnerExternal.java | 2 +-
.../src/harry/model/sut/ByteUtils.java | 115 ++++++
.../model/sut/InJVMTokenAwareVisitExecutor.java | 11 +-
.../src/harry/model/sut/InJvmSut.java | 14 +-
.../src/harry/runner/HarryRunnerJvm.java | 2 +-
.../harry/runner/RepairingLocalStateValidator.java | 21 +-
.../src/harry/runner/TrivialShrinker.java | 25 +-
.../src/harry/visitors/SkippingVisitor.java | 13 +-
.../generators/DataGeneratorsIntegrationTest.java | 6 +-
.../harry/model/HistoryBuilderIntegrationTest.java | 27 +-
.../test/harry/model/HistoryBuilderTest.java | 3 +-
.../harry/model/InJVMTokenAwareExecutorTest.java | 15 +-
.../test/harry/model/IntegrationTestBase.java | 4 +-
.../test/harry/model/ModelTestBase.java | 32 +-
.../harry/model/QuerySelectorNegativeTest.java | 7 +-
.../test/harry/model/QuerySelectorTest.java | 14 +-
.../model/QuiescentCheckerIntegrationTest.java | 39 +-
.../test/resources/single_partition_test.yml | 5 +-
54 files changed, 1550 insertions(+), 628 deletions(-)
diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index c6fecc7..3545e18 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -76,6 +76,7 @@ public class Configuration
mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
+ mapper.registerSubtypes(Configuration.SingleVisitRunnerConfig.class);
mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
mapper.registerSubtypes(Configuration.NoOpDataTrackerConfiguration.class);
@@ -106,6 +107,7 @@ public class Configuration
public final SchemaProviderConfiguration schema_provider;
public final boolean drop_schema;
+ public final String keyspace_ddl;
public final boolean create_schema;
public final boolean truncate_table;
@@ -117,13 +119,11 @@ public class Configuration
public final PDSelectorConfiguration partition_descriptor_selector;
public final CDSelectorConfiguration clustering_descriptor_selector;
- public final long run_time;
- public final TimeUnit run_time_unit;
-
@JsonCreator
public Configuration(@JsonProperty("seed") long seed,
@JsonProperty("schema_provider") SchemaProviderConfiguration schema_provider,
@JsonProperty("drop_schema") boolean drop_schema,
+ @JsonProperty("create_keyspace") String keyspace_ddl,
@JsonProperty("create_schema") boolean create_schema,
@JsonProperty("truncate_schema") boolean truncate_table,
@JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
@@ -132,12 +132,11 @@ public class Configuration
@JsonProperty("system_under_test") SutConfiguration system_under_test,
@JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
@JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
- @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
- @JsonProperty(value = "run_time", defaultValue = "2") long run_time,
- @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit run_time_unit)
+ @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector)
{
this.seed = seed;
this.schema_provider = schema_provider;
+ this.keyspace_ddl = keyspace_ddl;
this.drop_schema = drop_schema;
this.create_schema = create_schema;
this.truncate_table = truncate_table;
@@ -147,8 +146,6 @@ public class Configuration
this.data_tracker = data_tracker;
this.partition_descriptor_selector = partition_descriptor_selector;
this.clustering_descriptor_selector = clustering_descriptor_selector;
- this.run_time = run_time;
- this.run_time_unit = run_time_unit;
this.runner = runner;
}
@@ -233,14 +230,15 @@ public class Configuration
OpSelectors.MonotonicClock clock = snapshot.clock.make();
MetricReporter metricReporter = snapshot.metric_reporter.make();
- // TODO: parse schema
- SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);
+
+ // TODO: validate that operation kind is compatible with schema, due to statics etc
+ SystemUnderTest sut = snapshot.system_under_test.make();
+
+ SchemaSpec schemaSpec = snapshot.schema_provider.make(seed, sut);
schemaSpec.validate();
OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
- // TODO: validate that operation kind is compactible with schema, due to statics etc
- SystemUnderTest sut = snapshot.system_under_test.make();
return new Run(rng,
clock,
@@ -263,6 +261,7 @@ public class Configuration
long seed;
SchemaProviderConfiguration schema_provider = new DefaultSchemaProviderConfiguration();
+ String keyspace_ddl;
boolean drop_schema;
boolean create_schema;
boolean truncate_table;
@@ -275,9 +274,6 @@ public class Configuration
PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
- long run_time = 2;
- TimeUnit run_time_unit = TimeUnit.HOURS;
-
public ConfigurationBuilder setSeed(long seed)
{
this.seed = seed;
@@ -290,20 +286,19 @@ public class Configuration
return this;
}
- public ConfigurationBuilder setRunTime(long runTime, TimeUnit runTimeUnit)
+ public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
{
- this.run_time_unit = Objects.requireNonNull(runTimeUnit, "unit");
- this.run_time = runTime;
+ this.data_tracker = tracker;
return this;
}
-
- public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
+ public ConfigurationBuilder setDataTracker(String keyspace_ddl)
{
- this.data_tracker = tracker;
+ this.keyspace_ddl = keyspace_ddl;
return this;
}
+
public ConfigurationBuilder setClock(ClockConfiguration clock)
{
this.clock = clock;
@@ -370,20 +365,16 @@ public class Configuration
return new Configuration(seed,
schema_provider,
drop_schema,
+ keyspace_ddl,
create_schema,
truncate_table,
metric_reporter,
clock,
-
runner,
system_under_test,
data_tracker,
-
partition_descriptor_selector,
- clustering_descriptor_selector,
-
- run_time,
- run_time_unit);
+ clustering_descriptor_selector);
}
}
@@ -403,8 +394,6 @@ public class Configuration
builder.partition_descriptor_selector = partition_descriptor_selector;
builder.clustering_descriptor_selector = clustering_descriptor_selector;
- builder.run_time = run_time;
- builder.run_time_unit = run_time_unit;
return builder;
}
@@ -425,36 +414,10 @@ public class Configuration
public DataTracker make()
{
- return new DataTracker()
- {
- public void started(long lts)
- {
- }
-
- public void finished(long lts)
- {
-
- }
-
- public long maxStarted()
- {
- return 0;
- }
-
- public long maxConsecutiveFinished()
- {
- return 0;
- }
-
- public DataTrackerConfiguration toConfig()
- {
- return null;
- }
- };
+ return DataTracker.NO_OP;
}
}
-
@JsonTypeName("default")
public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
{
@@ -562,38 +525,74 @@ public class Configuration
public static class ConcurrentRunnerConfig implements RunnerConfiguration
{
public final int concurrency;
- public final List<VisitorConfiguration> visitor_factories;
+
+ @JsonProperty(value = "visitors")
+ public final List<VisitorConfiguration> visitorFactories;
+
+ public final long run_time;
+ public final TimeUnit run_time_unit;
@JsonCreator
- public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
- @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
+ public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "4") int concurrency,
+ @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
+ @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+ @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
{
this.concurrency = concurrency;
- this.visitor_factories = visitors;
+ this.visitorFactories = visitors;
+ this.run_time = runtime;
+ this.run_time_unit = runtimeUnit;
}
@Override
public Runner make(Run run, Configuration config)
{
- return new Runner.ConcurrentRunner(run, config, concurrency, visitor_factories);
+ return new Runner.ConcurrentRunner(run, config, concurrency, visitorFactories, run_time, run_time_unit);
}
}
@JsonTypeName("sequential")
public static class SequentialRunnerConfig implements RunnerConfiguration
{
- public final List<VisitorConfiguration> visitor_factories;
+ @JsonProperty(value = "visitors")
+ public final List<VisitorConfiguration> visitorFactories;
+
+ public final long run_time;
+ public final TimeUnit run_time_unit;
+
+ @JsonCreator
+ public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
+ @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+ @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+ {
+ this.visitorFactories = visitors;
+ this.run_time = runtime;
+ this.run_time_unit = runtimeUnit;
+ }
+
+ @Override
+ public Runner make(Run run, Configuration config)
+ {
+ return new Runner.SequentialRunner(run, config, visitorFactories, run_time, run_time_unit);
+ }
+ }
+
+ @JsonTypeName("single")
+ public static class SingleVisitRunnerConfig implements RunnerConfiguration
+ {
+ @JsonProperty(value = "visitors")
+ public final List<VisitorConfiguration> visitorFactories;
@JsonCreator
- public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
+ public SingleVisitRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
{
- this.visitor_factories = visitors;
+ this.visitorFactories = visitors;
}
@Override
public Runner make(Run run, Configuration config)
{
- return new Runner.SequentialRunner(run, config, visitor_factories);
+ return new Runner.SingleVisitRunner(run, config, visitorFactories);
}
}
@@ -675,7 +674,7 @@ public class Configuration
operation_kind_weights = new HashMap<>();
}
- public WeightedSelectorBuilder addWeight(T v, int weight)
+ public WeightedSelectorBuilder<T> addWeight(T v, int weight)
{
operation_kind_weights.put(v, weight);
return this;
@@ -852,7 +851,7 @@ public class Configuration
}
}
- @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface DistributionConfig extends Distribution.DistributionFactory
{
}
@@ -942,7 +941,7 @@ public class Configuration
@Override
public Visitor make(Run run)
{
- return new MutatingVisitor(run, row_visitor);
+ return new MutatingVisitor(run, row_visitor::make);
}
}
@@ -960,7 +959,7 @@ public class Configuration
@Override
public Visitor make(Run run)
{
- return new LoggingVisitor(run, row_visitor);
+ return new LoggingVisitor(run, row_visitor::make);
}
}
@@ -969,6 +968,8 @@ public class Configuration
{
public final int concurrency;
public final int trigger_after;
+
+ @JsonProperty("model")
public final Configuration.ModelConfiguration modelConfiguration;
@JsonCreator
@@ -1055,7 +1056,7 @@ public class Configuration
@JsonTypeName("default")
public static class DefaultSchemaProviderConfiguration implements SchemaProviderConfiguration
{
- public SchemaSpec make(long seed)
+ public SchemaSpec make(long seed, SystemUnderTest sut)
{
return SchemaGenerators.defaultSchemaSpecGen("harry", "table0")
.inflate(seed);
@@ -1103,7 +1104,7 @@ public class Configuration
this.regular_columns = regulars;
this.static_keys = statics;
}
- public SchemaSpec make(long seed)
+ public SchemaSpec make(long seed, SystemUnderTest sut)
{
return schemaSpec;
}
@@ -1122,6 +1123,4 @@ public class Configuration
return MetricReporter.NO_OP;
}
}
-
- // TODO: schema provider by DDL
}
diff --git a/harry-core/src/harry/core/Run.java b/harry-core/src/harry/core/Run.java
index b0c8afc..e3bc203 100644
--- a/harry-core/src/harry/core/Run.java
+++ b/harry-core/src/harry/core/Run.java
@@ -43,7 +43,6 @@ public class Run
OpSelectors.MonotonicClock clock,
OpSelectors.PdSelector pdSelector,
OpSelectors.DescriptorSelector descriptorSelector,
-
SchemaSpec schemaSpec,
DataTracker tracker,
SystemUnderTest sut,
@@ -59,7 +58,6 @@ public class Run
OpSelectors.PdSelector pdSelector,
OpSelectors.DescriptorSelector descriptorSelector,
QueryGenerator rangeSelector,
-
SchemaSpec schemaSpec,
DataTracker tracker,
SystemUnderTest sut,
diff --git a/harry-core/src/harry/corruptor/HideRowCorruptor.java b/harry-core/src/harry/corruptor/HideRowCorruptor.java
index a2a2648..ca16c2e 100644
--- a/harry-core/src/harry/corruptor/HideRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/HideRowCorruptor.java
@@ -44,6 +44,6 @@ public class HideRowCorruptor implements RowCorruptor
public CompiledStatement corrupt(ResultSetRow row)
{
- return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.maxLts()) + 1);
+ return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.peek()));
}
}
diff --git a/harry-core/src/harry/corruptor/HideValueCorruptor.java b/harry-core/src/harry/corruptor/HideValueCorruptor.java
index 3cedcda..c8617ff 100644
--- a/harry-core/src/harry/corruptor/HideValueCorruptor.java
+++ b/harry-core/src/harry/corruptor/HideValueCorruptor.java
@@ -77,7 +77,7 @@ public class HideValueCorruptor implements RowCorruptor
row.pd,
mask,
schema.regularAndStaticColumnsMask(),
- clock.rts(clock.maxLts()) + 1);
+ clock.rts(clock.peek()));
}
}
@@ -96,6 +96,6 @@ public class HideValueCorruptor implements RowCorruptor
row.cd,
mask,
schema.regularAndStaticColumnsMask(),
- clock.rts(clock.maxLts()) + 1);
+ clock.rts(clock.peek()));
}
}
diff --git a/harry-core/src/harry/corruptor/ShowValueCorruptor.java b/harry-core/src/harry/corruptor/ShowValueCorruptor.java
index 01372ce..236709e 100644
--- a/harry-core/src/harry/corruptor/ShowValueCorruptor.java
+++ b/harry-core/src/harry/corruptor/ShowValueCorruptor.java
@@ -72,6 +72,6 @@ public class ShowValueCorruptor implements RowCorruptor
// We do not know LTS of the deleted row. We could try inferring it, but that
// still won't help since we can't use it anyways, since collisions between a
// written value and tombstone are resolved in favour of tombstone.
- return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.maxLts()) + 1);
+ return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.peek()));
}
}
diff --git a/harry-core/src/harry/ddl/ColumnSpec.java b/harry-core/src/harry/ddl/ColumnSpec.java
index 6d652c4..9b52805 100644
--- a/harry-core/src/harry/ddl/ColumnSpec.java
+++ b/harry-core/src/harry/ddl/ColumnSpec.java
@@ -264,6 +264,21 @@ public class ColumnSpec<T>
}
};
+ public static final DataType<String> textType = new DataType<String>("text")
+ {
+ private final Bijections.Bijection<String> gen = new StringBijection();
+
+ public Bijections.Bijection<String> generator()
+ {
+ return gen;
+ }
+
+ public int compareLexicographically(long l, long r)
+ {
+ return Long.compare(l, r);
+ }
+ };
+
public static DataType<String> asciiType(int nibbleSize, int maxRandomBytes)
{
Bijections.Bijection<String> gen = new StringBijection(nibbleSize, maxRandomBytes);
@@ -325,6 +340,7 @@ public class ColumnSpec<T>
ColumnSpec.floatType,
ColumnSpec.doubleType,
ColumnSpec.asciiType,
+ ColumnSpec.textType,
ColumnSpec.uuidType,
ColumnSpec.timestampType));
diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java
index eba87c6..8d8a62f 100644
--- a/harry-core/src/harry/ddl/SchemaSpec.java
+++ b/harry-core/src/harry/ddl/SchemaSpec.java
@@ -26,6 +26,7 @@ import java.util.Objects;
import java.util.function.Consumer;
import harry.generators.DataGenerators;
+import harry.model.sut.SystemUnderTest;
import harry.operations.CompiledStatement;
import harry.operations.Relation;
import harry.util.BitSet;
@@ -36,7 +37,7 @@ public class SchemaSpec
{
public interface SchemaSpecFactory
{
- public SchemaSpec make(long seed);
+ public SchemaSpec make(long seed, SystemUnderTest sut);
}
public final DataGenerators.KeyGenerator pkGenerator;
@@ -244,7 +245,7 @@ public class SchemaSpec
{
StringBuilder sb = new StringBuilder();
- sb.append("CREATE TABLE ");
+ sb.append("CREATE TABLE IF NOT EXISTS ");
sb.append(keyspace)
.append(".")
.append(table)
diff --git a/harry-core/src/harry/dsl/HistoryBuilder.java b/harry-core/src/harry/dsl/HistoryBuilder.java
index 5e1ff6f..fa26277 100644
--- a/harry-core/src/harry/dsl/HistoryBuilder.java
+++ b/harry-core/src/harry/dsl/HistoryBuilder.java
@@ -31,8 +31,11 @@ import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
+import harry.core.Configuration;
import harry.core.Run;
import harry.model.OpSelectors;
+import harry.visitors.MutatingRowVisitor;
+import harry.visitors.MutatingVisitor;
import harry.visitors.ReplayingVisitor;
import harry.visitors.VisitExecutor;
@@ -598,9 +601,19 @@ public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
return new ReplayingVisitor.Operation(cd, opId, opType);
}
+ public void replayAll(Run run)
+ {
+ visitor(run).replayAll();
+ }
+
+ public ReplayingVisitor visitor(Run run)
+ {
+ return visitor(new MutatingVisitor.MutatingVisitExecutor(run, new MutatingRowVisitor(run)));
+ }
+
public ReplayingVisitor visitor(VisitExecutor executor)
{
- return new ReplayingVisitor(executor)
+ return new ReplayingVisitor(executor, run.clock::nextLts)
{
public Visit getVisit(long lts)
{
@@ -608,16 +621,15 @@ public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
return log.get((int) lts);
}
- public void replayAll(Run run)
+ public void replayAll()
{
long maxLts = HistoryBuilder.this.lts;
+
while (true)
{
- long lts = run.clock.currentLts();
- if (lts >= maxLts)
+ if (run.clock.peek() >= maxLts)
return;
- visit(lts);
- run.clock.nextLts();
+ visit();
}
}
};
diff --git a/harry-core/src/harry/generators/RngUtils.java b/harry-core/src/harry/generators/RngUtils.java
index 894204f..c974476 100644
--- a/harry-core/src/harry/generators/RngUtils.java
+++ b/harry-core/src/harry/generators/RngUtils.java
@@ -20,8 +20,15 @@ package harry.generators;
import java.util.function.LongSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.visitors.AllPartitionsValidator;
+
public class RngUtils
{
+ private static final Logger logger = LoggerFactory.getLogger(RngUtils.class);
+
private static final long CONSTANT = 0x2545F4914F6CDD1DL;
public static long next(long input)
{
@@ -131,8 +138,15 @@ public class RngUtils
long min = 0;
long max = ~0L;
int n = 0;
+ int steps = 0;
while (n != bits)
{
+ if (steps > 10_000)
+ {
+ throw new RuntimeException(String.format("Could not generate bits after 10K tries. " +
+ "Inputs: bits=%d, length=%d", bits, length));
+ }
+
long x = rng.getAsLong() & mask;
x = min | (x & max);
n = Long.bitCount(x);
@@ -140,6 +154,8 @@ public class RngUtils
max = x;
else
min = x;
+
+ steps++;
}
return min;
}
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index 7e5fc17..7727828 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -18,9 +18,11 @@
package harry.model;
+import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import java.util.function.LongConsumer;
import harry.core.Configuration;
import harry.core.VisibleForTesting;
@@ -76,19 +78,15 @@ public interface OpSelectors
* be taken to map a real-time timestamp from the value retrieved from the database in order
* to map it back to the logical timestamp of the operation that wrote this value.
*/
- public static interface MonotonicClock
+ public interface MonotonicClock
{
long rts(long lts);
-
long lts(long rts);
- long currentLts();
-
long nextLts();
+ long peek();
- long maxLts();
-
- public Configuration.ClockConfiguration toConfig();
+ Configuration.ClockConfiguration toConfig();
}
public static interface MonotonicClockFactory
@@ -304,6 +302,7 @@ public interface OpSelectors
return minLtsAt(position);
}
+ // TODO: add maxPosition to make it easier/more accessible for the components like sampler, etc
public long positionFor(long lts)
{
long windowStart = lts / switchAfter;
@@ -682,6 +681,14 @@ public interface OpSelectors
public BitSet partitionLevelOperationsMask(long pd, long lts)
{
int totalOps = opsPerModification(lts) * numberOfModifications(lts);
+ if (totalOps > 64)
+ {
+ throw new IllegalArgumentException("RngUtils#randomBits currently supports only up to 64 bits of entropy, so we can not " +
+ "split partition and row level operations for more than 64 operations at the moment." +
+ "Set modifications_per_lts to a number that is lower than 64 and use rows_per_modification" +
+ "to have more operations per LTS instead");
+ }
+
long seed = rng.randomNumber(pd, lts);
int partitionLevelOps = (int) Math.ceil(operationSelector.partitionLevelThreshold * totalOps);
@@ -692,8 +699,17 @@ public interface OpSelectors
private OperationKind operationType(long pd, long lts, long opId, BitSet partitionLevelOperationsMask)
{
- long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
- return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
+ try
+ {
+ long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
+ return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(String.format("Can not generate a random number with the following inputs: " +
+ "pd=%d lts=%d opId=%d partitionLevelOperationsMask=%s",
+ pd, lts, opId, partitionLevelOperationsMask));
+ }
}
public BitSet columnMask(long pd, long lts, long opId, OperationKind opType)
@@ -704,12 +720,12 @@ public interface OpSelectors
public long vd(long pd, long cd, long lts, long opId, int col)
{
- return rng.randomNumber(opId, pd ^ cd ^ lts ^ col);
+ return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
}
public long modificationId(long pd, long cd, long lts, long vd, int col)
{
- return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col);
+ return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col) - 1;
}
}
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index a630df2..0246e98 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -41,7 +41,7 @@ public class QuiescentChecker implements Model
protected final DataTracker tracker;
protected final SystemUnderTest sut;
protected final Reconciler reconciler;
- protected final SchemaSpec schemaSpec;
+ protected final SchemaSpec schema;
public QuiescentChecker(Run run)
{
@@ -54,7 +54,7 @@ public class QuiescentChecker implements Model
this.sut = run.sut;
this.reconciler = reconciler;
this.tracker = run.tracker;
- this.schemaSpec = run.schemaSpec;
+ this.schema = run.schemaSpec;
}
public void validate(Query query)
@@ -84,8 +84,8 @@ public class QuiescentChecker implements Model
{
ResultSetRow actualRowState = actual.next();
if (actualRowState.cd != partitionState.staticRow().cd)
- throw new ValidationException(partitionState.toString(schemaSpec),
- toString(actualRows, schemaSpec),
+ throw new ValidationException(partitionState.toString(schema),
+ toString(actualRows, schema),
"Found a row while model predicts statics only:" +
"\nExpected: %s" +
"\nActual: %s" +
@@ -96,8 +96,8 @@ public class QuiescentChecker implements Model
for (int i = 0; i < actualRowState.vds.length; i++)
{
if (actualRowState.vds[i] != NIL_DESCR || actualRowState.lts[i] != NO_TIMESTAMP)
- throw new ValidationException(partitionState.toString(schemaSpec),
- toString(actualRows, schemaSpec),
+ throw new ValidationException(partitionState.toString(schema),
+ toString(actualRows, schema),
"Found a row while model predicts statics only:" +
"\nActual: %s" +
"\nQuery: %s" +
@@ -105,7 +105,7 @@ public class QuiescentChecker implements Model
actualRowState, query.toSelectStatement());
}
- assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schemaSpec);
+ assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema);
}
while (actual.hasNext() && expected.hasNext())
@@ -114,45 +114,45 @@ public class QuiescentChecker implements Model
Reconciler.RowState expectedRowState = expected.next();
// TODO: this is not necessarily true. It can also be that ordering is incorrect.
if (actualRowState.cd != expectedRowState.cd)
- throw new ValidationException(partitionState.toString(schemaSpec),
- toString(actualRows, schemaSpec),
+ throw new ValidationException(partitionState.toString(schema),
+ toString(actualRows, schema),
"Found a row in the model that is not present in the resultset:" +
"\nExpected: %s" +
"\nActual: %s" +
"\nQuery: %s",
- expectedRowState.toString(schemaSpec),
+ expectedRowState.toString(schema),
actualRowState, query.toSelectStatement());
if (!Arrays.equals(actualRowState.vds, expectedRowState.vds))
- throw new ValidationException(partitionState.toString(schemaSpec),
- toString(actualRows, schemaSpec),
+ throw new ValidationException(partitionState.toString(schema),
+ toString(actualRows, schema),
"Returned row state doesn't match the one predicted by the model:" +
"\nExpected: %s (%s)" +
"\nActual: %s (%s)." +
"\nQuery: %s",
- Arrays.toString(expectedRowState.vds), expectedRowState.toString(schemaSpec),
+ Arrays.toString(expectedRowState.vds), expectedRowState.toString(schema),
Arrays.toString(actualRowState.vds), actualRowState,
query.toSelectStatement());
if (!Arrays.equals(actualRowState.lts, expectedRowState.lts))
- throw new ValidationException(partitionState.toString(schemaSpec),
- toString(actualRows, schemaSpec),
+ throw new ValidationException(partitionState.toString(schema),
+ toString(actualRows, schema),
"Timestamps in the row state don't match ones predicted by the model:" +
"\nExpected: %s (%s)" +
"\nActual: %s (%s)." +
"\nQuery: %s",
- Arrays.toString(expectedRowState.lts), expectedRowState.toString(schemaSpec),
+ Arrays.toString(expectedRowState.lts), expectedRowState.toString(schema),
Arrays.toString(actualRowState.lts), actualRowState,
query.toSelectStatement());
if (partitionState.staticRow() != null || actualRowState.sds != null || actualRowState.slts != null)
- assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schemaSpec);
+ assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema);
}
if (actual.hasNext() || expected.hasNext())
{
- throw new ValidationException(partitionState.toString(schemaSpec),
- toString(actualRows, schemaSpec),
+ throw new ValidationException(partitionState.toString(schema),
+ toString(actualRows, schema),
"Expected results to have the same number of results, but %s result iterator has more results." +
"\nExpected: %s" +
"\nActual: %s" +
diff --git a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
index 186a618..6c6f27f 100644
--- a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
+++ b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
@@ -48,9 +48,9 @@ import harry.model.OpSelectors;
// TODO: shut down
public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
{
- private static long START_VALUE = 0;
- private static long DEFUNCT = Long.MIN_VALUE;
- private static long REBASE_IN_PROGRESS = Long.MIN_VALUE + 1;
+ public static final long START_VALUE = 0;
+ public static final long DEFUNCT = Long.MIN_VALUE;
+ public static final long REBASE_IN_PROGRESS = Long.MIN_VALUE + 1;
// TODO: there's a theoretical possibility of a bug; when we have several consecutive epochs without
// change in LTS, current implementation will return the latest epoch instead of the earliest one.
@@ -151,11 +151,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
throw new IllegalStateException("No thread should have changed LTS during rebase. " + lts.get());
}
- public long currentLts()
- {
- return lts.get();
- }
-
+ @Override
public long nextLts()
{
long current = lts.get();
@@ -184,7 +180,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
}
}
- public long maxLts()
+ public long peek()
{
while (true)
{
@@ -236,7 +232,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
// TODO: binary search instead
public long rts(final long lts)
{
- assert lts <= maxLts() : String.format("Queried for LTS we haven't yet issued %d. Max is %d.", lts, maxLts());
+ assert lts <= peek() : String.format("Queried for LTS we haven't yet issued %d. Max is %d.", lts, peek());
final int historyIdx = idx - 1;
for (int i = 0; i < historySize - 1 && historyIdx - i >= 0; i++)
diff --git a/harry-core/src/harry/model/clock/OffsetClock.java b/harry-core/src/harry/model/clock/OffsetClock.java
index 6040125..6e8f891 100644
--- a/harry-core/src/harry/model/clock/OffsetClock.java
+++ b/harry-core/src/harry/model/clock/OffsetClock.java
@@ -28,7 +28,7 @@ import harry.model.OpSelectors;
public class OffsetClock implements OpSelectors.MonotonicClock
{
- final AtomicLong lts = new AtomicLong(0);
+ final AtomicLong lts = new AtomicLong(ApproximateMonotonicClock.START_VALUE);
private final long base;
@@ -47,17 +47,12 @@ public class OffsetClock implements OpSelectors.MonotonicClock
return rts - base;
}
- public long currentLts()
- {
- return lts.get();
- }
-
public long nextLts()
{
return lts.getAndIncrement();
}
- public long maxLts()
+ public long peek()
{
return lts.get();
}
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index 275b7c7..45649be 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -35,12 +35,12 @@ import harry.core.Run;
import harry.ddl.ColumnSpec;
import harry.ddl.SchemaSpec;
import harry.model.OpSelectors;
-import harry.visitors.GeneratingVisitor;
-import harry.visitors.Visitor;
import harry.operations.Query;
import harry.operations.QueryGenerator;
import harry.util.BitSet;
import harry.util.Ranges;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.LtsVisitor;
import harry.visitors.ReplayingVisitor;
import harry.visitors.VisitExecutor;
@@ -67,7 +67,7 @@ public class Reconciler
private final QueryGenerator rangeSelector;
private final SchemaSpec schema;
- private final Function<VisitExecutor, Visitor> visitorFactory;
+ private final Function<VisitExecutor, LtsVisitor> visitorFactory;
public Reconciler(Run run)
{
@@ -76,13 +76,13 @@ public class Reconciler
}
public Reconciler(Run run,
- Function<VisitExecutor, Visitor> visitorFactory)
+ Function<VisitExecutor, LtsVisitor> ltsVisitorFactory)
{
this.descriptorSelector = run.descriptorSelector;
this.pdSelector = run.pdSelector;
this.schema = run.schemaSpec;
this.rangeSelector = run.rangeSelector;
- this.visitorFactory = visitorFactory;
+ this.visitorFactory = ltsVisitorFactory;
}
private final long debugCd = Long.getLong("harry.reconciler.debug_cd", -1L);
@@ -91,7 +91,7 @@ public class Reconciler
{
PartitionState partitionState = new PartitionState();
- class Processor implements VisitExecutor
+ class Processor extends VisitExecutor
{
// Whether or not a partition deletion was encountered on this LTS.
private boolean hadPartitionDeletion = false;
@@ -100,7 +100,7 @@ public class Reconciler
private final List<ReplayingVisitor.Operation> columnDeletes = new ArrayList<>();
@Override
- public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+ protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
{
if (hadPartitionDeletion)
return;
@@ -151,7 +151,7 @@ public class Reconciler
}
@Override
- public void beforeLts(long lts, long pd)
+ protected void beforeLts(long lts, long pd)
{
rangeDeletes.clear();
writes.clear();
@@ -160,7 +160,7 @@ public class Reconciler
}
@Override
- public void afterLts(long lts, long pd)
+ protected void afterLts(long lts, long pd)
{
if (hadPartitionDeletion)
return;
@@ -248,13 +248,16 @@ public class Reconciler
}
@Override
- public void afterBatch(long lts, long pd, long m) {}
+ protected void afterBatch(long lts, long pd, long m) {}
+
+ @Override
+ protected void beforeBatch(long lts, long pd, long m) {}
@Override
- public void beforeBatch(long lts, long pd, long m) {}
+ public void shutdown() throws InterruptedException {}
}
- Visitor visitor = visitorFactory.apply(new Processor());
+ LtsVisitor visitor = visitorFactory.apply(new Processor());
long currentLts = pdSelector.minLtsFor(pd);
diff --git a/harry-core/src/harry/runner/DataTracker.java b/harry-core/src/harry/runner/DataTracker.java
index 9f518d1..0171e43 100644
--- a/harry-core/src/harry/runner/DataTracker.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -18,30 +18,62 @@
package harry.runner;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.LongConsumer;
+
import harry.core.Configuration;
-public interface DataTracker
+public abstract class DataTracker
{
- void started(long lts);
- void finished(long lts);
+ protected List<LongConsumer> onStarted = new ArrayList<>();
+ protected List<LongConsumer> onFinished = new ArrayList<>();
+
+ public void onLtsStarted(LongConsumer onLts)
+ {
+ this.onStarted.add(onLts);
+ }
- long maxStarted();
- long maxConsecutiveFinished();
+ public void onLtsFinished(LongConsumer onLts)
+ {
+ this.onFinished.add(onLts);
+ }
- public Configuration.DataTrackerConfiguration toConfig();
+ public void started(long lts)
+ {
+ startedInternal(lts);
+ for (LongConsumer consumer : onStarted)
+ consumer.accept(lts);
+ }
- interface DataTrackerFactory {
+ public void finished(long lts)
+ {
+ finishedInternal(lts);
+ for (LongConsumer consumer : onFinished)
+ consumer.accept(lts);
+ }
+
+ abstract void startedInternal(long lts);
+ abstract void finishedInternal(long lts);
+
+ public abstract long maxStarted();
+ public abstract long maxConsecutiveFinished();
+
+ public abstract Configuration.DataTrackerConfiguration toConfig();
+
+ public static interface DataTrackerFactory
+ {
DataTracker make();
}
- public static DataTracker NO_OP = new NoOpDataTracker();
+ public static final DataTracker NO_OP = new NoOpDataTracker();
- class NoOpDataTracker implements DataTracker
+ public static class NoOpDataTracker extends DataTracker
{
private NoOpDataTracker() {}
- public void started(long lts) {}
- public void finished(long lts) {}
+ protected void startedInternal(long lts) {}
+ protected void finishedInternal(long lts) {}
public long maxStarted() { return 0; }
public long maxConsecutiveFinished() { return 0; }
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
index d5cdb4b..85e2637 100644
--- a/harry-core/src/harry/runner/DefaultDataTracker.java
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -29,14 +29,14 @@ import org.slf4j.LoggerFactory;
import harry.core.Configuration;
import harry.core.VisibleForTesting;
-public class DefaultDataTracker implements DataTracker
+public class DefaultDataTracker extends DataTracker
{
private static final Logger logger = LoggerFactory.getLogger(DefaultDataTracker.class);
private final AtomicLong maxSeenLts;
// TODO: This is a trivial implementation that can be significantly improved upon
// for example, we could use a bitmap that records `1`s for all lts that are after
- // the consective, and "collapse" the bitmap state into the long as soon as we see
+ // the consecutive, and "collapse" the bitmap state into the long as soon as we see
// consecutive `1` on the left side.
private final AtomicLong maxCompleteLts;
private final PriorityBlockingQueue<Long> reorderBuffer;
@@ -49,13 +49,12 @@ public class DefaultDataTracker implements DataTracker
}
// TODO: there's also some room for improvement in terms of concurrency
- // TODO: remove pd?
- public void started(long lts)
+ protected void startedInternal(long lts)
{
recordEvent(lts, false);
}
- public void finished(long lts)
+ protected void finishedInternal(long lts)
{
recordEvent(lts, true);
}
@@ -63,10 +62,7 @@ public class DefaultDataTracker implements DataTracker
private void recordEvent(long lts, boolean finished)
{
// all seen LTS are allowed to be "in-flight"
- maxSeenLts.getAndUpdate((old) -> {
- assert finished || lts > old : String.format("Attempting to reuse lts: %d. Max seen: %d", lts, old);
- return Math.max(lts, old);
- });
+ maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
if (!finished)
return;
@@ -116,6 +112,9 @@ public class DefaultDataTracker implements DataTracker
public long maxConsecutiveFinished()
{
+ if (!reorderBuffer.isEmpty())
+ return drainReorderQueue();
+
return maxCompleteLts.get();
}
diff --git a/harry-core/src/harry/runner/HarryRunner.java b/harry-core/src/harry/runner/HarryRunner.java
index ba911c4..30f7f86 100644
--- a/harry-core/src/harry/runner/HarryRunner.java
+++ b/harry-core/src/harry/runner/HarryRunner.java
@@ -35,9 +35,9 @@ public abstract class HarryRunner
{
public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
- protected CompletableFuture progress;
+ protected CompletableFuture<?> progress;
protected ScheduledThreadPoolExecutor executor;
- public abstract void beforeRun(Runner runner);
+ public abstract void beforeRun(Runner.TimedRunner runner);
public void afterRun(Runner runner, Object result)
{
executor.shutdown();
@@ -64,7 +64,9 @@ public abstract class HarryRunner
Run run = runner.getRun();
progress = runner.initAndStartAll();
- beforeRun(runner);
+
+ assert runner instanceof Runner.TimedRunner : "Please use a timed runner at the top level.";
+ beforeRun((Runner.TimedRunner) runner);
Object result = null;
@@ -76,13 +78,12 @@ public abstract class HarryRunner
return a;
}).get();
if (result instanceof Throwable)
- logger.error("Execution failed", result);
+ logger.error("Execution failed!", (Throwable) result);
}
catch (Throwable e)
{
- logger.error("Failed due to exception: " + e.getMessage(),
- e);
+ logger.error("Failed due to exception: " + e.getMessage(), e);
result = e;
}
finally
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index 3a84d3e..4f99eab 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -24,40 +24,44 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import harry.core.Configuration;
import harry.core.Run;
-import harry.model.OpSelectors;
import harry.visitors.Visitor;
-
public abstract class Runner
{
private static final Logger logger = LoggerFactory.getLogger(Runner.class);
protected final Run run;
protected final Configuration config;
+ protected final ScheduledExecutorService executor;
// If there's an error, there's a good chance we're going to hit it more than once
- // since we have multiple concurrent checkers running
+ // since we have multiple concurrent checkers running
protected final CopyOnWriteArrayList<Throwable> errors;
- public Runner(Run run, Configuration config)
+ public Runner(Run run, Configuration config, int concurrency)
{
this.run = run;
this.config = config;
this.errors = new CopyOnWriteArrayList<>();
+ this.executor = Executors.newScheduledThreadPool(concurrency);
}
public Run getRun()
@@ -69,12 +73,11 @@ public abstract class Runner
{
if (config.create_schema)
{
- // TODO: make RF configurable or make keyspace DDL configurable
- run.sut.schemaChange("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ if (config.keyspace_ddl == null)
+ run.sut.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+ else
+ run.sut.schemaChange(config.keyspace_ddl);
- run.sut.schemaChange(String.format("DROP TABLE IF EXISTS %s.%s;",
- run.schemaSpec.keyspace,
- run.schemaSpec.table));
String schema = run.schemaSpec.compile().cql();
logger.info("Creating table: " + schema);
run.sut.schemaChange(schema);
@@ -115,13 +118,36 @@ public abstract class Runner
dumpStateToFile(run, config, errors);
}
- public abstract CompletableFuture<?> initAndStartAll() throws InterruptedException;
+ public CompletableFuture<?> initAndStartAll()
+ {
+ init();
+ return start();
+ }
+ public abstract String type();
+
public abstract void shutdown() throws InterruptedException;
+
+ protected CompletableFuture<?> start()
+ {
+ return start(true, () -> false);
+ }
+
+ protected abstract void shutDownVisitors();
+
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ protected void shutDownExecutors() throws InterruptedException
+ {
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+
+ protected abstract CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit);
protected Runnable reportThrowable(Runnable runnable, CompletableFuture<?> future)
{
- return () -> {
+ return () ->
+ {
try
{
if (!future.isDone())
@@ -130,193 +156,313 @@ public abstract class Runner
catch (Throwable t)
{
errors.add(t);
+
if (!future.isDone())
future.completeExceptionally(t);
}
};
}
- public static class SequentialRunner extends Runner
+ public interface RunnerFactory
+ {
+ Runner make(Run run, Configuration config);
+ }
+
+ public abstract static class TimedRunner extends Runner
+ {
+ public final long runtime;
+ public final TimeUnit runtimeUnit;
+
+ protected final ScheduledExecutorService shutdownExecutor;
+
+ public TimedRunner(Run run, Configuration config, int concurrency, long runtime, TimeUnit runtimeUnit)
+ {
+ super(run, config, concurrency);
+
+ this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.runtime = runtime;
+ this.runtimeUnit = runtimeUnit;
+ }
+
+ protected ScheduledFuture<?> scheduleTermination(AtomicBoolean terminated)
+ {
+ return shutdownExecutor.schedule(() ->
+ {
+ logger.info("Runner has reached configured runtime. Stopping...");
+ // TODO: wait for the last full validation?
+ terminated.set(true);
+ },
+ runtime,
+ runtimeUnit);
+ }
+
+ @Override
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ protected void shutDownExecutors() throws InterruptedException
+ {
+ shutdownExecutor.shutdownNow();
+ shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
+ executor.shutdownNow();
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ }
+
+ public static class SingleVisitRunner extends Runner
{
- private final ScheduledExecutorService executor;
- private final ScheduledExecutorService shutdownExceutor;
private final List<Visitor> visitors;
- private final Configuration config;
- public SequentialRunner(Run run,
+ public SingleVisitRunner(Run run,
Configuration config,
List<? extends Visitor.VisitorFactory> visitorFactories)
{
- super(run, config);
-
- this.executor = Executors.newSingleThreadScheduledExecutor();
- this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
- this.config = config;
- this.visitors = new ArrayList<>();
- for (Visitor.VisitorFactory factory : visitorFactories)
- visitors.add(factory.make(run));
+ super(run, config, 1);
+ this.visitors = visitorFactories.stream().map(factory -> factory.make(run)).collect(Collectors.toList());
}
- public CompletableFuture<?> initAndStartAll()
+ @Override
+ public String type()
+ {
+ return "single";
+ }
+
+ @Override
+ protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
{
- init();
CompletableFuture<?> future = new CompletableFuture<>();
- future.whenComplete((a, b) -> maybeReportErrors());
-
- AtomicBoolean completed = new AtomicBoolean(false);
- shutdownExceutor.schedule(() -> {
- logger.info("Completed");
- // TODO: wait for the last full validation?
- completed.set(true);
- }, config.run_time, config.run_time_unit);
-
- executor.submit(reportThrowable(() -> {
- try
- {
- SequentialRunner.run(visitors, run.clock, future,
- () -> Thread.currentThread().isInterrupted() || future.isDone() || completed.get());
- }
- catch (Throwable t)
- {
- future.completeExceptionally(t);
- }
- },
- future));
+ if (reportErrors)
+ future.whenComplete((a, b) -> maybeReportErrors());
+
+ executor.submit(reportThrowable(() -> run(visitors, future, parentExit), future));
return future;
}
- static void run(List<Visitor> visitors,
- OpSelectors.MonotonicClock clock,
- CompletableFuture<?> future,
- BooleanSupplier exitCondition)
+ private void run(List<Visitor> visitors, CompletableFuture<?> future, BooleanSupplier parentExit)
{
- while (!exitCondition.getAsBoolean())
+ for (Visitor value: visitors)
{
- long lts = clock.nextLts();
+ if (parentExit.getAsBoolean())
+ break;
+
+ value.visit();
+ }
+
+ future.complete(null);
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException
+ {
+ logger.info("Shutting down...");
+ shutDownVisitors();
- if (lts > 0 && lts % 10_000 == 0)
- logger.info("Visited {} logical timestamps", lts);
+ // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+ shutDownExecutors();
- for (int i = 0; i < visitors.size() && !exitCondition.getAsBoolean(); i++)
+ teardown();
+ }
+
+ @Override
+ protected void shutDownVisitors()
+ {
+ shutDownVisitors(visitors);
+ }
+ }
+
+ public static class SequentialRunner extends TimedRunner
+ {
+ protected final List<Visitor> visitors;
+
+ public SequentialRunner(Run run,
+ Configuration config,
+ List<? extends Visitor.VisitorFactory> visitorFactories,
+ long runtime, TimeUnit runtimeUnit)
+ {
+ super(run, config, 1, runtime, runtimeUnit);
+
+ this.visitors = visitorFactories.stream().map(factory -> factory.make(run)).collect(Collectors.toList());
+ }
+
+ @Override
+ public String type()
+ {
+ return "sequential";
+ }
+
+ @Override
+ protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+ {
+ CompletableFuture<?> future = new CompletableFuture<>();
+
+ if (reportErrors)
+ future.whenComplete((a, b) -> maybeReportErrors());
+
+ AtomicBoolean terminated = new AtomicBoolean(false);
+ scheduleTermination(terminated);
+ BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone()
+ || terminated.get() || parentExit.getAsBoolean();
+
+ executor.submit(reportThrowable(() -> run(visitors, future, exit), future));
+ return future;
+ }
+
+ protected void run(List<Visitor> visitors,
+ CompletableFuture<?> future,
+ BooleanSupplier exit)
+ {
+ while (!exit.getAsBoolean())
+ {
+ for (Visitor visitor: visitors)
{
- try
- {
- Visitor visitor = visitors.get(i);
- visitor.visit(lts);
- }
- catch (Throwable t)
- {
- future.completeExceptionally(t);
- throw t;
- }
+ if (exit.getAsBoolean())
+ break;
+ visitor.visit();
}
}
+
future.complete(null);
}
+ @Override
public void shutdown() throws InterruptedException
{
logger.info("Shutting down...");
- shutdownExceutor.shutdownNow();
- shutdownExceutor.awaitTermination(1, TimeUnit.MINUTES);
+ shutDownVisitors();
- executor.shutdownNow();
- executor.awaitTermination(1, TimeUnit.MINUTES);
// we need to wait for all threads that use schema to stop before we can tear down and drop the table
+ shutDownExecutors();
+
teardown();
}
- }
- public static interface RunnerFactory
- {
- public Runner make(Run run, Configuration config);
+ @Override
+ protected void shutDownVisitors()
+ {
+ shutDownVisitors(visitors);
+ }
}
// TODO: this requires some significant improvement
- public static class ConcurrentRunner extends Runner
+ public static class ConcurrentRunner extends TimedRunner
{
- private final ScheduledExecutorService executor;
- private final ScheduledExecutorService shutdownExecutor;
- private final List<? extends Visitor.VisitorFactory> visitorFactories;
- private final List<Visitor> allVisitors;
-
+ private final List<List<Visitor>> perThreadVisitors;
private final int concurrency;
- private final long runTime;
- private final TimeUnit runTimeUnit;
public ConcurrentRunner(Run run,
Configuration config,
int concurrency,
- List<? extends Visitor.VisitorFactory> visitorFactories)
+ List<? extends Visitor.VisitorFactory> visitorFactories,
+ long runtime, TimeUnit runtimeUnit)
{
- super(run, config);
- this.concurrency = concurrency;
- this.runTime = config.run_time;
- this.runTimeUnit = config.run_time_unit;
- // TODO: configure concurrency
- this.executor = Executors.newScheduledThreadPool(concurrency);
- this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
- this.visitorFactories = visitorFactories;
- this.allVisitors = new CopyOnWriteArrayList<>();
- }
+ super(run, config, concurrency, runtime, runtimeUnit);
- public CompletableFuture<?> initAndStartAll()
- {
- init();
- CompletableFuture<?> future = new CompletableFuture<>();
- future.whenComplete((a, b) -> maybeReportErrors());
-
- shutdownExecutor.schedule(() -> {
- logger.info("Completed");
- // TODO: wait for the last full validation?
- future.complete(null);
- }, runTime, runTimeUnit);
+ this.concurrency = concurrency;
+ this.perThreadVisitors = new ArrayList<>(concurrency);
- BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
for (int i = 0; i < concurrency; i++)
{
List<Visitor> visitors = new ArrayList<>();
- executor.submit(reportThrowable(() -> {
- for (Visitor.VisitorFactory factory : visitorFactories)
- visitors.add(factory.make(run));
- allVisitors.addAll(visitors);
- run(visitors, run.clock, exitCondition);
- },
- future));
+ for (Visitor.VisitorFactory factory : visitorFactories)
+ visitors.add(factory.make(run));
+
+ perThreadVisitors.add(visitors);
+ }
+ }
+
+ @Override
+ public String type()
+ {
+ return "concurrent";
+ }
+ @Override
+ protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+ {
+ CompletableFuture<?> future = new CompletableFuture<>();
+
+ if (reportErrors)
+ future.whenComplete((a, b) -> maybeReportErrors());
+
+ AtomicBoolean terminated = new AtomicBoolean(false);
+ scheduleTermination(terminated);
+ BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone()
+ || terminated.get() || parentExit.getAsBoolean();
+
+ AtomicInteger liveCount = new AtomicInteger(0);
+
+ for (int i = 0; i < concurrency; i++)
+ {
+ List<Visitor> visitors = perThreadVisitors.get(i);
+ executor.submit(reportThrowable(() -> run(visitors, future, exit, liveCount), future));
}
return future;
}
- void run(List<Visitor> visitors,
- OpSelectors.MonotonicClock clock,
- BooleanSupplier exitCondition)
+ private void run(List<Visitor> visitors,
+ CompletableFuture<?> future,
+ BooleanSupplier exit,
+ AtomicInteger liveCount)
{
- while (!exitCondition.getAsBoolean())
+ liveCount.incrementAndGet();
+
+ while (!exit.getAsBoolean())
{
- long lts = clock.nextLts();
for (Visitor visitor : visitors)
- visitor.visit(lts);
+ {
+ if (exit.getAsBoolean())
+ break;
+
+ visitor.visit();
+ }
}
+
+ // If we're the last worker still running, complete the future....
+ if (liveCount.decrementAndGet() == 0)
+ future.complete(null);
}
+ @Override
public void shutdown() throws InterruptedException
{
logger.info("Shutting down...");
- for (Visitor visitor : allVisitors)
- visitor.shutdown();
-
- shutdownExecutor.shutdownNow();
- shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
+ shutDownVisitors();
- executor.shutdownNow();
- executor.awaitTermination(1, TimeUnit.MINUTES);
// we need to wait for all threads that use schema to stop before we can tear down and drop the table
+ shutDownExecutors();
+
teardown();
}
+
+ @Override
+ protected void shutDownVisitors()
+ {
+ shutDownVisitors(perThreadVisitors.stream().flatMap(Collection::stream).collect(Collectors.toList()));
+ }
+ }
+
+ protected static void shutDownVisitors(List<Visitor> visitors)
+ {
+ Throwable error = null;
+
+ for (Visitor visitor : visitors)
+ {
+ try
+ {
+ visitor.shutdown();
+ }
+ catch (InterruptedException e)
+ {
+ if (error != null)
+ error.addSuppressed(e);
+ else
+ error = e;
+ }
+ }
+
+ if (error != null)
+ logger.warn("Failed to shut down all visitors!", error);
}
private static void dumpExceptionToFile(BufferedWriter bw, Throwable throwable) throws IOException
@@ -361,7 +507,7 @@ public abstract class Runner
File file = new File("run.yaml");
Configuration.ConfigurationBuilder builder = config.unbuild();
- // overrride stateful components
+ // override stateful components
builder.setClock(run.clock.toConfig());
builder.setDataTracker(run.tracker.toConfig());
@@ -373,12 +519,14 @@ public abstract class Runner
}
catch (Throwable e)
{
- logger.error("Caught an error while trying to dump to file",
- e);
+ logger.error("Caught an error while trying to dump to file", e);
try
{
File f = new File("tmp.dump");
- f.createNewFile();
+
+ if (!f.createNewFile())
+ logger.info("File {} already exists. Appending...", f);
+
BufferedWriter tmp = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
dumpExceptionToFile(tmp, e);
}
diff --git a/harry-core/src/harry/runner/StagedRunner.java b/harry-core/src/harry/runner/StagedRunner.java
new file mode 100644
index 0000000..88ebcdb
--- /dev/null
+++ b/harry-core/src/harry/runner/StagedRunner.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Configuration;
+import harry.core.Run;
+
+public class StagedRunner extends Runner.TimedRunner
+{
+ public static final String TYPE = "staged";
+
+ public static void register()
+ {
+ Configuration.registerSubtypes(StagedRunner.StagedRunnerConfig.class);
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(StagedRunner.class);
+
+ private final List<Configuration.RunnerConfiguration> runnerFactories;
+ private final List<Runner> stages;
+
+ public StagedRunner(Run run,
+ Configuration config,
+ List<Configuration.RunnerConfiguration> runnerFactories,
+ long runtime, TimeUnit runtimeUnit)
+ {
+ super(run, config, 1, runtime, runtimeUnit);
+ this.runnerFactories = runnerFactories;
+ this.stages = new CopyOnWriteArrayList<>();
+ }
+
+ @Override
+ public String type() {
+ return TYPE;
+ }
+
+ @Override
+ protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+ {
+ CompletableFuture<?> future = new CompletableFuture<>();
+
+ if (reportErrors)
+ {
+ future.whenComplete((a, b) ->
+ {
+ collectErrors();
+ maybeReportErrors();
+ });
+ }
+
+ AtomicBoolean terminated = new AtomicBoolean(false);
+ scheduleTermination(terminated);
+
+ BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone()
+ || terminated.get() || parentExit.getAsBoolean();
+
+ executor.submit(() ->
+ {
+ for (Configuration.RunnerConfiguration runnerConfig : runnerFactories)
+ {
+ try
+ {
+ Runner runner = runnerConfig.make(run, config);
+ if (runner instanceof StagedRunner)
+ throw new IllegalArgumentException("StagedRunner can not be nested inside of a StagedRunner");
+ stages.add(runner);
+ }
+ catch (Throwable t)
+ {
+ future.completeExceptionally(t);
+ }
+ }
+
+ while (!exit.getAsBoolean())
+ {
+ logger.info("Starting next staged run...");
+ int stage = 1;
+
+ for (Runner runner : stages)
+ {
+ if (exit.getAsBoolean())
+ break;
+
+ try
+ {
+ logger.info("Starting stage {}: {}...", stage++, runner.type());
+ runner.start(false, exit).get();
+
+ if (!terminated.get())
+ logger.info("...stage complete.");
+
+ // Wait for any previous runners to settle down...
+ while (run.tracker.maxConsecutiveFinished() != run.tracker.maxStarted())
+ {
+ TimeUnit.SECONDS.sleep(1);
+ logger.warn("Waiting for any previous runners to settle down: {}",
+ run.tracker.toString());
+ }
+
+ }
+ catch (Throwable t)
+ {
+ future.completeExceptionally(t);
+ break;
+ }
+ }
+
+ if (!terminated.get())
+ logger.info("All stages complete!");
+ }
+
+ future.complete(null);
+ });
+
+ return future;
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException
+ {
+ logger.info("Shutting down...");
+
+ for (Runner runner : stages)
+ {
+ runner.shutDownVisitors();
+ runner.shutDownExecutors();
+ }
+
+ shutDownExecutors();
+ teardown();
+ }
+
+ @Override
+ protected void shutDownVisitors()
+ {
+ // Visitors are shut down in the sub-runners.
+ }
+
+ private void collectErrors()
+ {
+ for (Runner runner : stages)
+ {
+ errors.addAll(runner.errors);
+ }
+ }
+
+ @JsonTypeName(TYPE)
+ public static class StagedRunnerConfig implements Configuration.RunnerConfiguration
+ {
+ @JsonProperty(value = "stages")
+ public final List<Configuration.RunnerConfiguration> runnerFactories;
+
+ public final long run_time;
+ public final TimeUnit run_time_unit;
+
+ @JsonCreator
+ public StagedRunnerConfig(@JsonProperty(value = "stages") List<Configuration.RunnerConfiguration> stages,
+ @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+ @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+ {
+ this.runnerFactories = stages;
+ this.run_time = runtime;
+ this.run_time_unit = runtimeUnit;
+ }
+
+ @Override
+ public Runner make(Run run, Configuration config)
+ {
+ return new StagedRunner(run, config, runnerFactories, run_time, run_time_unit);
+ }
+ }
+}
diff --git a/harry-core/src/harry/runner/UpToLtsRunner.java b/harry-core/src/harry/runner/UpToLtsRunner.java
new file mode 100644
index 0000000..f89ecbb
--- /dev/null
+++ b/harry-core/src/harry/runner/UpToLtsRunner.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.visitors.Visitor;
+
+public class UpToLtsRunner extends Runner.SequentialRunner
+{
+ public static final String TYPE = "up_to_lts";
+
+ public static void register()
+ {
+ Configuration.registerSubtypes(UpToLtsRunnerConfig.class);
+ }
+
+ private final long maxLts;
+
+ public UpToLtsRunner(Run run,
+ Configuration config,
+ List<? extends Visitor.VisitorFactory> visitorFactories,
+ long maxLts,
+ long runtime, TimeUnit runtimeUnit)
+ {
+ super(run, config, visitorFactories, runtime, runtimeUnit);
+ this.maxLts = maxLts;
+ }
+
+ @Override
+ public String type()
+ {
+ return TYPE;
+ }
+
+ @Override
+ protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+ {
+ CompletableFuture<?> future = new CompletableFuture<>();
+
+ if (reportErrors)
+ future.whenComplete((a, b) -> maybeReportErrors());
+
+ AtomicBoolean terminated = new AtomicBoolean(false);
+ scheduleTermination(terminated);
+ BooleanSupplier exit = () -> run.clock.peek() >= maxLts
+ || Thread.currentThread().isInterrupted() || future.isDone()
+ || terminated.get() || parentExit.getAsBoolean();
+
+ executor.submit(reportThrowable(() -> run(visitors, future, exit), future));
+ return future;
+ }
+
+ @JsonTypeName(TYPE)
+ public static class UpToLtsRunnerConfig implements Configuration.RunnerConfiguration
+ {
+ public final List<Configuration.VisitorConfiguration> visitor_factories;
+ public final long max_lts;
+ public final long run_time;
+ public final TimeUnit run_time_unit;
+
+ @JsonCreator
+ public UpToLtsRunnerConfig(@JsonProperty(value = "visitors") List<Configuration.VisitorConfiguration> visitors,
+ @JsonProperty(value = "max_lts") long maxLts,
+ @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+ @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+ {
+ this.visitor_factories = visitors;
+ this.max_lts = maxLts;
+ this.run_time = runtime;
+ this.run_time_unit = runtimeUnit;
+ }
+
+ @Override
+ public Runner make(Run run, Configuration config)
+ {
+ return new UpToLtsRunner(run, config, visitor_factories, max_lts, run_time, run_time_unit);
+ }
+ }
+}
diff --git a/harry-core/src/harry/util/ByteUtils.java b/harry-core/src/harry/util/ByteUtils.java
new file mode 100644
index 0000000..4adf536
--- /dev/null
+++ b/harry-core/src/harry/util/ByteUtils.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.util;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class ByteUtils
+{
+ public static ByteBuffer bytes(String s)
+ {
+ return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static ByteBuffer bytes(String s, Charset charset)
+ {
+ return ByteBuffer.wrap(s.getBytes(charset));
+ }
+
+ public static ByteBuffer bytes(byte b)
+ {
+ return ByteBuffer.allocate(1).put(0, b);
+ }
+
+ public static ByteBuffer bytes(short s)
+ {
+ return ByteBuffer.allocate(2).putShort(0, s);
+ }
+
+ public static ByteBuffer bytes(int i)
+ {
+ return ByteBuffer.allocate(4).putInt(0, i);
+ }
+
+ public static ByteBuffer bytes(long n)
+ {
+ return ByteBuffer.allocate(8).putLong(0, n);
+ }
+
+ public static ByteBuffer bytes(float f)
+ {
+ return ByteBuffer.allocate(4).putFloat(0, f);
+ }
+
+ public static ByteBuffer bytes(double d)
+ {
+ return ByteBuffer.allocate(8).putDouble(0, d);
+ }
+
+ public static ByteBuffer bytes(InetAddress address)
+ {
+ return ByteBuffer.wrap(address.getAddress());
+ }
+
+ public static ByteBuffer bytes(UUID uuid)
+ {
+ return ByteBuffer.wrap(decompose(uuid));
+ }
+
+ public static byte[] decompose(UUID uuid)
+ {
+ long most = uuid.getMostSignificantBits();
+ long least = uuid.getLeastSignificantBits();
+ byte[] b = new byte[16];
+ for (int i = 0; i < 8; i++)
+ {
+ b[i] = (byte)(most >>> ((7-i) * 8));
+ b[8+i] = (byte)(least >>> ((7-i) * 8));
+ }
+ return b;
+ }
+
+
+ public static ByteBuffer objectToBytes(Object obj)
+ {
+ if (obj instanceof Integer)
+ return bytes((int) obj);
+ else if (obj instanceof Byte)
+ return bytes((byte) obj);
+ else if (obj instanceof Boolean)
+ return bytes( (byte) (((boolean) obj) ? 1 : 0));
+ else if (obj instanceof Short)
+ return bytes((short) obj);
+ else if (obj instanceof Long)
+ return bytes((long) obj);
+ else if (obj instanceof Float)
+ return bytes((float) obj);
+ else if (obj instanceof Double)
+ return bytes((double) obj);
+ else if (obj instanceof UUID)
+ return bytes((UUID) obj);
+ else if (obj instanceof InetAddress)
+ return bytes((InetAddress) obj);
+ else if (obj instanceof String)
+ return bytes((String) obj);
+ else if (obj instanceof List)
+ {
+ throw new UnsupportedOperationException("Please use ByteUtils from integration package");
+ }
+ else if (obj instanceof Set)
+ {
+ throw new UnsupportedOperationException("Please use ByteUtils from integration package");
+ }
+ else if (obj instanceof ByteBuffer)
+ return (ByteBuffer) obj;
+ else
+ throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+ obj,
+ obj.getClass()));
+ }
+
+ public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements)
+ {
+ int size = 0;
+ for (ByteBuffer bb : buffers)
+ size += sizeOfValue(bb);
+
+ ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements) + size);
+ writeCollectionSize(result, elements);
+ for (ByteBuffer bb : buffers)
+ writeValue(result, bb);
+ result.flip();
+ return result;
+ }
+
+ public static int sizeOfValue(ByteBuffer value)
+ {
+ return value == null ? 4 : 4 + value.remaining();
+ }
+
+ protected static void writeCollectionSize(ByteBuffer output, int elements)
+ {
+ output.putInt(elements);
+ }
+ public static void writeValue(ByteBuffer output, ByteBuffer value)
+ {
+ if (value == null)
+ {
+ output.putInt(-1);
+ return;
+ }
+
+ output.putInt(value.remaining());
+ output.put(value.duplicate());
+ }
+
+ protected static int sizeOfCollectionSize(int elements)
+ {
+ return 4;
+ }
+
+ public static ByteBuffer compose(ByteBuffer... buffers) {
+ if (buffers.length == 1) return buffers[0];
+
+ int totalLength = 0;
+ for (ByteBuffer bb : buffers) totalLength += 2 + bb.remaining() + 1;
+
+ ByteBuffer out = ByteBuffer.allocate(totalLength);
+ for (ByteBuffer buffer : buffers) {
+ ByteBuffer bb = buffer.duplicate();
+ putShortLength(out, bb.remaining());
+ out.put(bb);
+ out.put((byte) 0);
+ }
+ out.flip();
+ return out;
+ }
+
+ public static void putShortLength(ByteBuffer bb, int length) {
+ bb.put((byte) ((length >> 8) & 0xFF));
+ bb.put((byte) (length & 0xFF));
+ }
+
+}
diff --git a/harry-core/src/harry/visitors/AllPartitionsValidator.java b/harry-core/src/harry/visitors/AllPartitionsValidator.java
index 1b67a8e..7bd5b50 100644
--- a/harry-core/src/harry/visitors/AllPartitionsValidator.java
+++ b/harry-core/src/harry/visitors/AllPartitionsValidator.java
@@ -50,6 +50,10 @@ public class AllPartitionsValidator implements Visitor
protected final MetricReporter metricReporter;
protected final ExecutorService executor;
protected final SystemUnderTest sut;
+
+ protected final AtomicBoolean condition;
+ protected final AtomicLong maxPos;
+
protected final int concurrency;
protected final int triggerAfter;
@@ -67,20 +71,27 @@ public class AllPartitionsValidator implements Visitor
this.pdSelector = run.pdSelector;
this.concurrency = concurrency;
this.executor = Executors.newFixedThreadPool(concurrency);
+ this.condition = new AtomicBoolean();
+ this.maxPos = new AtomicLong(-1);
+ run.tracker.onLtsStarted((lts) -> {
+ maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
+ if (triggerAfter == 0 || (triggerAfter > 0 && lts % triggerAfter == 0))
+ condition.set(true);
+ });
}
protected CompletableFuture<Void> validateAllPartitions(ExecutorService executor, int parallelism)
{
final long maxPos = this.maxPos.get();
- AtomicLong counter = new AtomicLong();
- CompletableFuture[] futures = new CompletableFuture[parallelism];
+ AtomicLong cnt = new AtomicLong();
+ CompletableFuture<?>[] futures = new CompletableFuture[parallelism];
AtomicBoolean isDone = new AtomicBoolean(false);
for (int i = 0; i < parallelism; i++)
{
futures[i] = CompletableFuture.supplyAsync(() -> {
long pos;
- while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted() && !isDone.get())
+ while ((pos = cnt.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted() && !isDone.get())
{
if (pos > 0 && pos % 100 == 0)
logger.info(String.format("Validated %d out of %d partitions", pos, maxPos));
@@ -109,27 +120,30 @@ public class AllPartitionsValidator implements Visitor
return CompletableFuture.allOf(futures);
}
- private final AtomicLong maxPos = new AtomicLong(-1);
-
- public void visit(long lts)
+ public void visit()
{
- maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
-
- if (triggerAfter > 0 && lts % triggerAfter == 0)
+ // TODO: this is ok for now, but if/when we bring exhaustive checker back, we need to change this:
+ // we'll probably want a low-concurrency validator somewhere in background all the time.
+ if (condition.compareAndSet(true, false))
{
- logger.info("Starting validations of all {} partitions", maxPos.get());
+ long lts = clock.peek();
+ logger.info("Starting validation of all partitions as of lts {}...", lts);
+
try
{
validateAllPartitions(executor, concurrency).get();
}
catch (Throwable e)
{
- throw new RuntimeException(e);
+ // TODO: Make a utility out of this.
+ throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
}
- logger.info("Finished validations of all partitions");
+ logger.info("...finished validating all partitions as of lts {}.", lts);
}
}
+ @SuppressWarnings("ResultOfMethodCallIgnored")
+ @Override
public void shutdown() throws InterruptedException
{
executor.shutdown();
diff --git a/harry-core/src/harry/visitors/CorruptingVisitor.java b/harry-core/src/harry/visitors/CorruptingVisitor.java
index 26febeb..4677ea2 100644
--- a/harry-core/src/harry/visitors/CorruptingVisitor.java
+++ b/harry-core/src/harry/visitors/CorruptingVisitor.java
@@ -64,8 +64,9 @@ public class CorruptingVisitor implements Visitor
private final AtomicLong maxPos = new AtomicLong(-1);
- public void visit(long lts)
+ public void visit()
{
+ long lts = run.clock.peek();
maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
if (lts == 0 || lts % triggerAfter != 0)
@@ -87,8 +88,4 @@ public class CorruptingVisitor implements Visitor
logger.error("Caught an exception while trying to corrupt a partition.", t);
}
}
-
- public void shutdown() throws InterruptedException
- {
- }
}
diff --git a/harry-core/src/harry/visitors/DelegatingVisitor.java b/harry-core/src/harry/visitors/DelegatingVisitor.java
deleted file mode 100644
index f4994b1..0000000
--- a/harry-core/src/harry/visitors/DelegatingVisitor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package harry.visitors;
-
-import harry.model.OpSelectors;
-
-public abstract class DelegatingVisitor implements Visitor, VisitExecutor
-{
- protected VisitExecutor delegate;
-
- public DelegatingVisitor(VisitExecutor delegate)
- {
- this.delegate = delegate;
- }
-
- public void beforeLts(long lts, long pd)
- {
- delegate.beforeLts(lts, pd);
- }
-
- public void afterLts(long lts, long pd)
- {
- delegate.afterLts(lts, pd);
- }
-
- public void beforeBatch(long lts, long pd, long m)
- {
- delegate.beforeBatch(lts, pd, m);
- }
-
- public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
- {
- delegate.operation(lts, pd, cd, m, opId, opType);
- }
-
- public void afterBatch(long lts, long pd, long m)
- {
- delegate.afterBatch(lts, pd, m);
- }
-
- public void shutdown() throws InterruptedException
- {
- delegate.shutdown();
- }
-}
diff --git a/harry-core/src/harry/visitors/GeneratingVisitor.java b/harry-core/src/harry/visitors/GeneratingVisitor.java
index 78e2b07..624330f 100644
--- a/harry-core/src/harry/visitors/GeneratingVisitor.java
+++ b/harry-core/src/harry/visitors/GeneratingVisitor.java
@@ -22,7 +22,7 @@ import harry.core.Run;
import harry.ddl.SchemaSpec;
import harry.model.OpSelectors;
-public class GeneratingVisitor extends DelegatingVisitor
+public class GeneratingVisitor extends LtsVisitor
{
private final OpSelectors.PdSelector pdSelector;
private final OpSelectors.DescriptorSelector descriptorSelector;
@@ -31,7 +31,8 @@ public class GeneratingVisitor extends DelegatingVisitor
public GeneratingVisitor(Run run,
VisitExecutor delegate)
{
- super(delegate);
+ super(delegate, run.clock::nextLts);
+
this.pdSelector = run.pdSelector;
this.descriptorSelector = run.descriptorSelector;
this.schema = run.schemaSpec;
@@ -50,10 +51,10 @@ public class GeneratingVisitor extends DelegatingVisitor
int modificationsCount = descriptorSelector.numberOfModifications(lts);
int opsPerModification = descriptorSelector.opsPerModification(lts);
- for (int m = 0; m < modificationsCount; m++)
+ for (long m = 0; m < modificationsCount; m++)
{
beforeBatch(lts, pd, m);
- for (int i = 0; i < opsPerModification; i++)
+ for (long i = 0; i < opsPerModification; i++)
{
long opId = m * opsPerModification + i;
long cd = descriptorSelector.cd(pd, lts, opId, schema);
diff --git a/harry-core/src/harry/visitors/LoggingVisitor.java b/harry-core/src/harry/visitors/LoggingVisitor.java
index 8deebde..35f9aa0 100644
--- a/harry-core/src/harry/visitors/LoggingVisitor.java
+++ b/harry-core/src/harry/visitors/LoggingVisitor.java
@@ -31,7 +31,6 @@ import harry.operations.CompiledStatement;
public class LoggingVisitor extends GeneratingVisitor
{
-
public LoggingVisitor(Run run,
OperationExecutor.RowVisitorFactory rowVisitorFactory)
{
diff --git a/harry-core/src/harry/visitors/LtsVisitor.java b/harry-core/src/harry/visitors/LtsVisitor.java
new file mode 100644
index 0000000..5b37db0
--- /dev/null
+++ b/harry-core/src/harry/visitors/LtsVisitor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.visitors;
+
+import java.util.function.LongSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.model.OpSelectors;
+
+/**
+ * Common class for all visitors that support visits at a specific logical timestamp.
+ *
+ * Classes inheriting from LTS Visitor have to visit drawn LTS: every LTS that has been received
+ * from the consumer _has to_ actually be visited. If this is not done, during model checking
+ * drawn LTS will be considered visited, which will lead to data inconsistencies.
+ *
+ * This class and its implementations such as Mutating visitor are NOT thread safe. If you'd like
+ * to have several threads generating data, use multiple copies of delegating visitor, since
+ */
+public abstract class LtsVisitor extends VisitExecutor implements Visitor
+{
+ private static final Logger logger = LoggerFactory.getLogger(LtsVisitor.class);
+ protected final VisitExecutor delegate;
+ private final LongSupplier ltsSource;
+
+ public LtsVisitor(VisitExecutor delegate,
+ LongSupplier ltsSource)
+ {
+ this.delegate = delegate;
+ this.ltsSource = ltsSource;
+ }
+
+ public final void visit()
+ {
+ long lts = ltsSource.getAsLong();
+ if (lts > 0 && lts % 10_000 == 0)
+ logger.info("Visiting lts {}...", lts);
+ visit(lts);
+ }
+
+ public abstract void visit(long lts);
+
+ @Override
+ protected void beforeLts(long lts, long pd)
+ {
+ delegate.beforeLts(lts, pd);
+ }
+
+ @Override
+ protected void afterLts(long lts, long pd)
+ {
+ delegate.afterLts(lts, pd);
+ }
+
+ @Override
+ protected void beforeBatch(long lts, long pd, long m)
+ {
+ delegate.beforeBatch(lts, pd, m);
+ }
+
+ @Override
+ protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+ {
+ delegate.operation(lts, pd, cd, m, opId, opType);
+ }
+
+ @Override
+ protected void afterBatch(long lts, long pd, long m)
+ {
+ delegate.afterBatch(lts, pd, m);
+ }
+
+ @Override
+ public void shutdown() throws InterruptedException
+ {
+ delegate.shutdown();
+ }
+
+}
diff --git a/harry-core/src/harry/visitors/MutatingVisitor.java b/harry-core/src/harry/visitors/MutatingVisitor.java
index 26c7417..d6eb617 100644
--- a/harry-core/src/harry/visitors/MutatingVisitor.java
+++ b/harry-core/src/harry/visitors/MutatingVisitor.java
@@ -19,6 +19,7 @@
package harry.visitors;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@@ -41,10 +42,16 @@ public class MutatingVisitor extends GeneratingVisitor
public MutatingVisitor(Run run,
OperationExecutor.RowVisitorFactory rowVisitorFactory)
{
- super(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
+ this(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
}
- public static class MutatingVisitExecutor implements VisitExecutor
+ public MutatingVisitor(Run run,
+ VisitExecutor visitExecutor)
+ {
+ super(run, visitExecutor);
+ }
+
+ public static class MutatingVisitExecutor extends VisitExecutor
{
private final List<String> statements = new ArrayList<>();
private final List<Object> bindings = new ArrayList<>();
@@ -104,8 +111,7 @@ public class MutatingVisitor extends GeneratingVisitor
CompiledStatement statement = operationInternal(lts, pd, cd, m, opId, opType);
statements.add(statement.cql());
- for (Object binding : statement.bindings())
- bindings.add(binding);
+ Collections.addAll(bindings, statement.bindings());
}
protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
@@ -140,10 +146,10 @@ public class MutatingVisitor extends GeneratingVisitor
protected void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement)
{
- executeAsyncWithRetries(lts, pd, future, statement, 0);
+ executeAsyncWithRetries(future, statement, 0);
}
- private void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
+ private void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
{
if (sut.isShutdown())
throw new IllegalStateException("System under test is shut down");
@@ -154,8 +160,10 @@ public class MutatingVisitor extends GeneratingVisitor
sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
.whenComplete((res, t) -> {
if (t != null)
- executor.schedule(() -> executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, TimeUnit.SECONDS);
- else
+ {
+ logger.error("Caught message while trying to execute " + statement, t);
+ executor.schedule(() -> executeAsyncWithRetries(future, statement, retries + 1), 1, TimeUnit.SECONDS);
+ }else
future.complete(res);
});
}
diff --git a/harry-core/src/harry/visitors/ParallelValidator.java b/harry-core/src/harry/visitors/ParallelValidator.java
index 4772b40..e96bd39 100644
--- a/harry-core/src/harry/visitors/ParallelValidator.java
+++ b/harry-core/src/harry/visitors/ParallelValidator.java
@@ -86,8 +86,9 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
}
}
- public void visit(long lts)
+ public void visit()
{
+ long lts = run.clock.peek();
maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
if (triggerAfter > 0 && lts % triggerAfter == 0)
@@ -104,6 +105,7 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
}
}
+ @Override
public void shutdown() throws InterruptedException
{
executor.shutdown();
diff --git a/harry-core/src/harry/visitors/RecentValidator.java b/harry-core/src/harry/visitors/RecentValidator.java
index d0d09fa..a563a30 100644
--- a/harry-core/src/harry/visitors/RecentValidator.java
+++ b/harry-core/src/harry/visitors/RecentValidator.java
@@ -24,6 +24,9 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -46,8 +49,12 @@ public class RecentValidator implements Visitor
private final OpSelectors.PdSelector pdSelector;
private final QueryGenerator.TypedQueryGenerator querySelector;
private final MetricReporter metricReporter;
+ private final OpSelectors.MonotonicClock clock;
+
+ private final AtomicBoolean condition;
+ private final AtomicLong maxPos;
+
private final int partitionCount;
- private final int triggerAfter;
private final int queries;
public RecentValidator(int partitionCount,
@@ -57,11 +64,19 @@ public class RecentValidator implements Visitor
Model.ModelFactory modelFactory)
{
this.partitionCount = partitionCount;
- this.triggerAfter = triggerAfter;
this.queries = queries;
this.metricReporter = run.metricReporter;
this.pdSelector = run.pdSelector;
+ this.clock = run.clock;
+ this.condition = new AtomicBoolean();
+ this.maxPos = new AtomicLong(-1);
+
+ run.tracker.onLtsStarted((lts) -> {
+ maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
+ if (triggerAfter == 0 || (triggerAfter > 0 && lts % triggerAfter == 0))
+ condition.set(true);
+ });
this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
// TODO: make query kind configurable
Surjections.enumValues(Query.QueryKind.class),
@@ -78,10 +93,8 @@ public class RecentValidator implements Visitor
}
}
- private final AtomicLong maxPos = new AtomicLong(-1);
-
// TODO: expose metric, how many times validated recent partitions
- private void validateRecentPartitions(long lts)
+ private int validateRecentPartitions()
{
long pos = maxPos.get();
@@ -94,35 +107,34 @@ public class RecentValidator implements Visitor
metricReporter.validateRandomQuery();
Query query = querySelector.inflate(visitLts, i);
// TODO: add pd skipping from shrinker here, too
- log(lts, i, query);
+ log(i, query);
model.validate(query);
}
pos--;
maxPartitions--;
}
+
+ return partitionCount - maxPartitions;
}
- public void visit(long lts)
+ @Override
+ public void visit()
{
- maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
-
- if (triggerAfter > 0 && lts % triggerAfter == 0)
+ if (condition.compareAndSet(true, false))
{
- logger.info("Validating {} recent partitions", partitionCount);
- validateRecentPartitions(lts);
+ long lts = clock.peek();
+ logger.info("Validating (up to) {} recent partitions as of lts {}...", partitionCount, lts);
+ int count = validateRecentPartitions();
+ logger.info("...finished validating {} recent partitions as of lts {}.", count, lts);
}
}
- public void shutdown() throws InterruptedException
- {
- }
-
- private void log(long lts, int modifier, Query query)
+ private void log(int modifier, Query query)
{
try
{
- validationLog.write("LTS: " + lts + ". Modifier: " + modifier + ". PD: " + query.pd);
+ validationLog.write(String.format("PD: %d. Modifier: %d.", query.pd, modifier));
validationLog.write("\t");
validationLog.write(query.toSelectStatement().toString());
validationLog.write("\n");
diff --git a/harry-core/src/harry/visitors/ReplayingVisitor.java b/harry-core/src/harry/visitors/ReplayingVisitor.java
index 1979664..bd3e1c2 100644
--- a/harry-core/src/harry/visitors/ReplayingVisitor.java
+++ b/harry-core/src/harry/visitors/ReplayingVisitor.java
@@ -19,17 +19,19 @@
package harry.visitors;
import java.util.Arrays;
+import java.util.function.LongSupplier;
import harry.core.Run;
import harry.model.OpSelectors;
-public abstract class ReplayingVisitor extends DelegatingVisitor
+public abstract class ReplayingVisitor extends LtsVisitor
{
- public ReplayingVisitor(VisitExecutor delegate)
+ public ReplayingVisitor(VisitExecutor delegate, LongSupplier ltsSource)
{
- super(delegate);
+ super(delegate, ltsSource);
}
+ @Override
public void visit(long lts)
{
replay(getVisit(lts));
@@ -37,7 +39,8 @@ public abstract class ReplayingVisitor extends DelegatingVisitor
public abstract Visit getVisit(long lts);
- public abstract void replayAll(Run run);
+ public abstract void replayAll();
+
private void replay(Visit visit)
{
beforeLts(visit.lts, visit.pd);
diff --git a/harry-core/src/harry/visitors/Sampler.java b/harry-core/src/harry/visitors/Sampler.java
index 20c9a8c..fe73089 100644
--- a/harry-core/src/harry/visitors/Sampler.java
+++ b/harry-core/src/harry/visitors/Sampler.java
@@ -43,6 +43,7 @@ public class Sampler implements Visitor
// TODO: move maxPos to data tracker since we seem to use quite a lot?
private final AtomicLong maxPos = new AtomicLong(-1);
private final OpSelectors.PdSelector pdSelector;
+ private final OpSelectors.MonotonicClock clock;
private final SchemaSpec schema;
private final int triggerAfter;
private final int samplePartitions;
@@ -52,16 +53,18 @@ public class Sampler implements Visitor
{
this.sut = run.sut;
this.pdSelector = run.pdSelector;
+ this.clock = run.clock;
this.schema = run.schemaSpec;
this.triggerAfter = triggerAfter;
this.samplePartitions = samplePartitions;
}
- public void visit(long lts)
+ public void visit()
{
+ long lts = clock.peek();
maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
- if (triggerAfter > 0 && lts % triggerAfter == 0)
+ if (triggerAfter == 0 || triggerAfter > 0 && lts % triggerAfter == 0)
{
long max = maxPos.get();
DescriptiveStatistics ds = new DescriptiveStatistics();
@@ -84,10 +87,6 @@ public class Sampler implements Visitor
}
}
- public void shutdown() throws InterruptedException
- {
- }
-
@JsonTypeName("sampler")
public static class SamplerConfiguration implements Configuration.VisitorConfiguration
{
diff --git a/harry-core/src/harry/visitors/SingleValidator.java b/harry-core/src/harry/visitors/SingleValidator.java
index 0993461..85a378a 100644
--- a/harry-core/src/harry/visitors/SingleValidator.java
+++ b/harry-core/src/harry/visitors/SingleValidator.java
@@ -29,6 +29,7 @@ public class SingleValidator implements Visitor
protected final Model model;
protected final QueryGenerator queryGenerator;
protected final Run run;
+
public SingleValidator(int iterations,
Run run,
Model.ModelFactory modelFactory)
@@ -39,9 +40,10 @@ public class SingleValidator implements Visitor
this.run = run;
}
- public void shutdown() throws InterruptedException
+ @Override
+ public void visit()
{
-
+ visit(run.clock.peek());
}
public void visit(long lts)
diff --git a/harry-core/src/harry/visitors/VisitExecutor.java b/harry-core/src/harry/visitors/VisitExecutor.java
index 94aa1fc..63efba0 100644
--- a/harry-core/src/harry/visitors/VisitExecutor.java
+++ b/harry-core/src/harry/visitors/VisitExecutor.java
@@ -20,17 +20,17 @@ package harry.visitors;
import harry.model.OpSelectors;
-public interface VisitExecutor
+public abstract class VisitExecutor
{
- public void beforeLts(long lts, long pd);
+ protected abstract void beforeLts(long lts, long pd);
- public void afterLts(long lts, long pd);
+ protected abstract void afterLts(long lts, long pd);
- public void beforeBatch(long lts, long pd, long m);
+ protected abstract void beforeBatch(long lts, long pd, long m);
- public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
+ protected abstract void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
- public void afterBatch(long lts, long pd, long m);
+ protected abstract void afterBatch(long lts, long pd, long m);
- public default void shutdown() throws InterruptedException {}
+ public abstract void shutdown() throws InterruptedException;
}
diff --git a/harry-core/src/harry/visitors/Visitor.java b/harry-core/src/harry/visitors/Visitor.java
index 7cb6111..b8baf10 100644
--- a/harry-core/src/harry/visitors/Visitor.java
+++ b/harry-core/src/harry/visitors/Visitor.java
@@ -22,12 +22,12 @@ import harry.core.Run;
public interface Visitor
{
- void visit(long lts);
+ void visit();
- public default void shutdown() throws InterruptedException {}
+ default void shutdown() throws InterruptedException {}
- public interface VisitorFactory
+ interface VisitorFactory
{
- public Visitor make(Run run);
+ Visitor make(Run run);
}
}
\ No newline at end of file
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 0f474f0..e2ebd82 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -44,8 +44,8 @@ import harry.model.clock.OffsetClock;
import harry.model.sut.SystemUnderTest;
import harry.operations.CompiledStatement;
import harry.runner.DataTracker;
+import harry.visitors.LtsVisitor;
import harry.visitors.MutatingVisitor;
-import harry.visitors.Visitor;
import harry.visitors.OperationExecutor;
import harry.util.BitSet;
@@ -205,82 +205,80 @@ public class OpSelectorsTest
};
Run run = new Run(rng,
- new OffsetClock(0),
- pdSelector,
- ckSelector,
- schema,
- DataTracker.NO_OP,
- SystemUnderTest.NO_OP,
- MetricReporter.NO_OP);
-
- Visitor visitor = new MutatingVisitor(run,
- (r) -> new OperationExecutor()
- {
- public CompiledStatement insert(long lts, long pd, long cd, long m)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement update(long lts, long pd, long cd, long opId)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement deletePartition(long lts, long pd, long opId)
- {
- // ignore
- return compiledStatement;
- }
-
- public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
- {
- consumer.accept(pd, cd);
- return compiledStatement;
- }
-
- public CompiledStatement deleteRange(long lts, long pd, long opId)
- {
- // ignore
- return compiledStatement;
- }
-
- public CompiledStatement deleteSlice(long lts, long pd, long opId)
- {
- // ignore
- return compiledStatement;
- }
- });
+ new OffsetClock(0),
+ pdSelector,
+ ckSelector,
+ schema,
+ DataTracker.NO_OP,
+ SystemUnderTest.NO_OP,
+ MetricReporter.NO_OP);
+
+ LtsVisitor visitor = new MutatingVisitor(run,
+ (r) -> new OperationExecutor()
+ {
+ public CompiledStatement insert(long lts, long pd, long cd, long m)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement update(long lts, long pd, long cd, long opId)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement deletePartition(long lts, long pd, long opId)
+ {
+ // ignore
+ return compiledStatement;
+ }
+
+ public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
+ {
+ consumer.accept(pd, cd);
+ return compiledStatement;
+ }
+
+ public CompiledStatement deleteRange(long lts, long pd, long opId)
+ {
+ // ignore
+ return compiledStatement;
+ }
+
+ public CompiledStatement deleteSlice(long lts, long pd, long opId)
+ {
+ // ignore
+ return compiledStatement;
+ }
+ });
for (int lts = 0; lts < 1000; lts++)
- {
- visitor.visit(lts);
- }
+ visitor.visit();
for (Collection<Long> value : partitionMap.values())
Assert.assertEquals(10, value.size());
diff --git a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
index 702163e..621d579 100644
--- a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
+++ b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
@@ -38,7 +38,7 @@ public class HarryRunnerExternal extends HarryRunner {
}
@Override
- public void beforeRun(Runner runner) {
+ public void beforeRun(Runner.TimedRunner runner) {
}
}
diff --git a/harry-integration/src/harry/model/sut/ByteUtils.java b/harry-integration/src/harry/model/sut/ByteUtils.java
new file mode 100644
index 0000000..ee7721c
--- /dev/null
+++ b/harry-integration/src/harry/model/sut/ByteUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package harry.model.sut;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.DoubleType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ShortType;
+import org.apache.cassandra.db.marshal.UUIDType;
+
+public class ByteUtils extends harry.util.ByteUtils
+{
+ public static ByteBuffer objectToBytes(Object obj)
+ {
+ if (obj instanceof Integer)
+ return bytes((int) obj);
+ else if (obj instanceof Byte)
+ return bytes((byte) obj);
+ else if (obj instanceof Boolean)
+ return bytes( (byte) (((boolean) obj) ? 1 : 0));
+ else if (obj instanceof Short)
+ return bytes((short) obj);
+ else if (obj instanceof Long)
+ return bytes((long) obj);
+ else if (obj instanceof Float)
+ return bytes((float) obj);
+ else if (obj instanceof Double)
+ return bytes((double) obj);
+ else if (obj instanceof UUID)
+ return bytes((UUID) obj);
+ else if (obj instanceof InetAddress)
+ return bytes((InetAddress) obj);
+ else if (obj instanceof String)
+ return bytes((String) obj);
+ else if (obj instanceof List)
+ {
+ List l = (List) obj;
+ if (l.isEmpty())
+ return pack(Collections.emptyList(), 0);
+ return ListType.getInstance(getType(l.get(0)), true).decompose(l);
+ }
+ else if (obj instanceof Set)
+ {
+ Set l = (Set) obj;
+ if (l.isEmpty())
+ return pack(Collections.emptyList(), 0);
+ return SetType.getInstance(getType(l.iterator().next()), true).decompose(l);
+ }
+ else if (obj instanceof ByteBuffer)
+ return (ByteBuffer) obj;
+
+ else
+ throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+ obj,
+ obj.getClass()));
+ }
+
+ public static AbstractType<?> getType(Object obj)
+ {
+ if (obj instanceof Integer)
+ return IntegerType.instance; //TODO
+ else if (obj instanceof Byte)
+ return ByteType.instance;
+ else if (obj instanceof Boolean)
+ return BooleanType.instance;
+ else if (obj instanceof Short)
+ return ShortType.instance;
+ else if (obj instanceof Long)
+ return IntegerType.instance;
+ else if (obj instanceof Float)
+ return FloatType.instance;
+ else if (obj instanceof Double)
+ return DoubleType.instance;
+ else if (obj instanceof UUID)
+ return UUIDType.instance;
+ else if (obj instanceof InetAddress)
+ return InetAddressType.instance;
+ else if (obj instanceof String)
+ return AsciiType.instance;
+ else
+ throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+ obj,
+ obj.getClass()));
+ }
+}
diff --git a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
index ff4ad86..6bc0705 100644
--- a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
+++ b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import harry.core.Configuration;
import harry.core.Run;
import harry.ddl.SchemaSpec;
import harry.operations.CompiledStatement;
@@ -38,7 +37,7 @@ public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorE
{
public static void init()
{
- Configuration.registerSubtypes(Configuation.class);
+ harry.core.Configuration.registerSubtypes(Configuration.class);
}
private final InJvmSut sut;
@@ -92,14 +91,14 @@ public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorE
}
@JsonTypeName("in_jvm_token_aware")
- public static class Configuation implements Configuration.VisitorConfiguration
+ public static class Configuration implements harry.core.Configuration.VisitorConfiguration
{
- public final Configuration.RowVisitorConfiguration row_visitor;
+ public final harry.core.Configuration.RowVisitorConfiguration row_visitor;
public final SystemUnderTest.ConsistencyLevel consistency_level;
@JsonCreator
- public Configuation(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration rowVisitor,
- @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
+ public Configuration(@JsonProperty("row_visitor") harry.core.Configuration.RowVisitorConfiguration rowVisitor,
+ @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
{
this.row_visitor = rowVisitor;
this.consistency_level = consistencyLevel;
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index f758ca5..687a5b3 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -20,27 +20,19 @@ package harry.model.sut;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import harry.core.Configuration;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
{
@@ -49,8 +41,6 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
Configuration.registerSubtypes(InJvmSutConfiguration.class);
}
- private static final Logger logger = LoggerFactory.getLogger(InJvmSut.class);
-
public InJvmSut(Cluster cluster)
{
super(cluster, 10);
@@ -97,7 +87,7 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
{
return cluster.get(1).appliesOnInstance((Object[] pk, String ks) ->
{
- String pkString = Arrays.asList(pk).stream().map(Object::toString).collect(Collectors.joining(":"));
+ String pkString = Arrays.stream(pk).map(Object::toString).collect(Collectors.joining(":"));
EndpointsForToken endpoints = StorageService.instance.getNaturalReplicasForToken(ks, table, pkString);
int[] nodes = new int[endpoints.size()];
for (int i = 0; i < endpoints.size(); i++)
@@ -105,4 +95,4 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
return nodes;
}).apply(partitionKey, keyspace);
}
-}
\ No newline at end of file
+}
diff --git a/harry-integration/src/harry/runner/HarryRunnerJvm.java b/harry-integration/src/harry/runner/HarryRunnerJvm.java
index fbb30b9..ed4ea46 100644
--- a/harry-integration/src/harry/runner/HarryRunnerJvm.java
+++ b/harry-integration/src/harry/runner/HarryRunnerJvm.java
@@ -37,7 +37,7 @@ public class HarryRunnerJvm extends HarryRunner {
@Override
- public void beforeRun(Runner runner) {
+ public void beforeRun(Runner.TimedRunner runner) {
}
}
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
index fe811e5..71c1535 100644
--- a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -19,7 +19,6 @@
package harry.runner;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,6 +28,7 @@ import harry.core.Configuration;
import harry.core.Run;
import harry.data.ResultSetRow;
import harry.model.Model;
+import harry.model.OpSelectors;
import harry.model.QuiescentChecker;
import harry.model.sut.InJvmSut;
import harry.model.sut.SystemUnderTest;
@@ -47,27 +47,26 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
QuiescentCheckerConfig.class);
}
- public final InJvmSut inJvmSut;
-
+ private final InJvmSut inJvmSut;
+ private final OpSelectors.MonotonicClock clock;
public RepairingLocalStateValidator(int concurrency, int triggerAfter, Run run, Model.ModelFactory modelFactory)
{
super(concurrency, triggerAfter, run, modelFactory);
-
this.inJvmSut = (InJvmSut) run.sut;
+ this.clock = run.clock;
}
- @Override
- public void visit(long lts)
+ public void visit()
{
+ long lts = clock.peek();
if (lts > 0 && lts % triggerAfter == 0)
{
System.out.println("Starting repair...");
- inJvmSut.cluster().stream().forEach((instance) -> {
- instance.nodetool("repair", "--full");
- });
+ inJvmSut.cluster().stream().forEach((instance) -> instance.nodetool("repair", "--full"));
+
System.out.println("Validating partitions...");
- super.visit(lts);
+ super.visit();
}
}
@@ -110,7 +109,7 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
public void validate(Query query)
{
CompiledStatement compiled = query.toSelectStatement();
- int[] replicas = inJvmSut.getReplicasFor(schemaSpec.inflatePartitionKey(query.pd), schemaSpec.keyspace, schemaSpec.table);
+ int[] replicas = inJvmSut.getReplicasFor(schema.inflatePartitionKey(query.pd), schema.keyspace, schema.table);
for (int node : replicas)
{
validate(() -> {
diff --git a/harry-integration/src/harry/runner/TrivialShrinker.java b/harry-integration/src/harry/runner/TrivialShrinker.java
index 2879e0e..b598b85 100644
--- a/harry-integration/src/harry/runner/TrivialShrinker.java
+++ b/harry-integration/src/harry/runner/TrivialShrinker.java
@@ -24,13 +24,14 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import harry.core.Configuration;
import harry.core.Run;
-import harry.visitors.DelegatingVisitor;
-import harry.visitors.Visitor;
+import harry.visitors.LtsVisitor;
import harry.visitors.SkippingVisitor;
+import harry.visitors.Visitor;
/**
* A most trivial imaginable shrinker: attempts to skip partitions and/or logical timestamps to see if the
@@ -63,15 +64,17 @@ public class TrivialShrinker
Run run = configuration.createRun();
Configuration.SequentialRunnerConfig config = (Configuration.SequentialRunnerConfig) configuration.runner;
List<Visitor> visitors = new ArrayList<>();
- for (Configuration.VisitorConfiguration factory : config.visitor_factories)
+ for (Configuration.VisitorConfiguration factory : config.visitorFactories)
{
Visitor visitor = factory.make(run);
- if (visitor instanceof DelegatingVisitor)
+ if (visitor instanceof LtsVisitor)
{
- visitors.add(new SkippingVisitor(visitor,
+ AtomicLong counter = new AtomicLong();
+ visitors.add(new SkippingVisitor((LtsVisitor) visitor,
+ counter::getAndIncrement,
(lts) -> run.pdSelector.pd(lts, run.schemaSpec),
ltsToSkip,
- pdsToSkip));
+ pdsToSkip)) ;
}
else
{
@@ -93,7 +96,7 @@ public class TrivialShrinker
try
{
- runOnce(run, visitors, maxLts);
+ runOnce(visitors, maxLts);
System.out.println("Can not skip " + pdToCheck + "\nCan only skip these: " + toString(pdsToSkip));
pdsToSkip.remove(pdToCheck);
}
@@ -126,7 +129,7 @@ public class TrivialShrinker
try
{
- runOnce(run, visitors, maxLts);
+ runOnce(visitors, maxLts);
System.out.println("Can not skip " + ltsToCheck + "\nCan only skip these: " + toString(ltsToSkip));
ltsToSkip.remove(ltsToCheck);
}
@@ -160,13 +163,13 @@ public class TrivialShrinker
}
}
- public static void runOnce(Run run, List<Visitor> visitors, long maxLts)
+ public static void runOnce(List<Visitor> visitors, long maxLts)
{
for (long lts = 0; lts <= maxLts; lts++)
{
for (Visitor visitor : visitors)
{
- visitor.visit(lts);
+ visitor.visit();
}
}
}
@@ -183,4 +186,4 @@ public class TrivialShrinker
}
return s.substring(0, s.length() - 1);
}
-}
\ No newline at end of file
+}
diff --git a/harry-integration/src/harry/visitors/SkippingVisitor.java b/harry-integration/src/harry/visitors/SkippingVisitor.java
index 8da0a89..67c6488 100644
--- a/harry-integration/src/harry/visitors/SkippingVisitor.java
+++ b/harry-integration/src/harry/visitors/SkippingVisitor.java
@@ -19,20 +19,23 @@
package harry.visitors;
import java.util.Set;
+import java.util.function.LongSupplier;
-public class SkippingVisitor implements Visitor
+public class SkippingVisitor extends LtsVisitor
{
private final Set<Long> ltsToSkip;
private final Set<Long> pdsToSkip;
private final LtsToPd ltsToPd;
- private final Visitor delegate;
+ // Use DelegatingVisitor class instead of VisitExecutor available via protected field
+ private LtsVisitor delegateShadow;
- public SkippingVisitor(Visitor delegate,
+ public SkippingVisitor(LtsVisitor delegate,
+ LongSupplier ltsSupplier,
LtsToPd ltsToPd,
Set<Long> ltsToSkip,
Set<Long> pdsToSkip)
{
- this.delegate = delegate;
+ super(delegate, ltsSupplier);
this.ltsToSkip = ltsToSkip;
this.pdsToSkip = pdsToSkip;
this.ltsToPd = ltsToPd;
@@ -43,7 +46,7 @@ public class SkippingVisitor implements Visitor
if (ltsToSkip.contains(lts) || pdsToSkip.contains(ltsToPd.convert(lts)))
return;
- delegate.visit(lts);
+ delegateShadow.visit(lts);
}
public static interface LtsToPd
diff --git a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
index 33463b8..0da7743 100644
--- a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
+++ b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
@@ -16,9 +16,9 @@ import harry.model.OpSelectors;
import harry.model.sut.SystemUnderTest;
import harry.visitors.MutatingVisitor;
import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
import harry.visitors.SingleValidator;
import harry.util.TestRunner;
+import harry.visitors.Visitor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.distributed.impl.RowUtil;
@@ -94,12 +94,12 @@ public class DataGeneratorsIntegrationTest extends CQLTester
Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
for (int lts = 0; lts < 100; lts++)
- visitor.visit(lts);
+ visitor.visit();
}
Run run = builder.build()
.createRun();
- Visitor visitor = new SingleValidator(100, run, NoOpChecker::new);
+ SingleValidator visitor = new SingleValidator(100, run, NoOpChecker::new);
for (int lts = 0; lts < 100; lts++)
visitor.visit(lts);
diff --git a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
index 6c0ee24..31cd9f1 100644
--- a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.Random;
import java.util.Set;
import java.util.function.Supplier;
+import org.junit.Assert;
import org.junit.Test;
import harry.core.Configuration;
@@ -65,6 +66,9 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
HistoryBuilder history = new HistoryBuilder(run);
Set<Long> pds = new HashSet<>();
+ run.tracker.onLtsStarted((long lts) -> {
+ pds.add(run.pdSelector.pd(lts, run.schemaSpec));
+ });
for (int j = 0; j < 5; j++)
{
@@ -97,28 +101,19 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
.partitionDelete()
.finish();
- ReplayingVisitor visitor = history.visitor(new MutatingVisitor.MutatingVisitExecutor(run,
- new MutatingRowVisitor(run)
- {
- public CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
- {
- pds.add(pd);
- return super.perform(op, lts, pd, cd, opId);
- }
- }));
+ ReplayingVisitor visitor = history.visitor(run);
- visitor.replayAll(run);
+ visitor.replayAll();
Model model = new QuiescentChecker(run, new Reconciler(run,
history::visitor));
QueryGenerator.TypedQueryGenerator queryGenerator = new QueryGenerator.TypedQueryGenerator(run);
+ Assert.assertFalse(pds.isEmpty());
for (Long pd : pds)
{
- model.validate(Query.selectPartition(run.schemaSpec,
- pd,
- false));
+ model.validate(Query.selectPartition(run.schemaSpec, pd,false));
- int lts = new Random().nextInt((int) run.clock.maxLts());
+ int lts = new Random().nextInt((int) run.clock.peek());
for (int k = 0; k < 3; k++)
queryGenerator.inflate(lts, k);
}
@@ -169,7 +164,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
}
}));
- visitor.replayAll(run);
+ visitor.replayAll();
Model model = new QuiescentChecker(run, new Reconciler(run,
sut::visitor));
@@ -182,7 +177,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
model.validate(Query.selectPartition(run.schemaSpec,
pd,
true));
- int lts = new Random().nextInt((int) run.clock.maxLts());
+ int lts = new Random().nextInt((int) run.clock.peek());
for (int k = 0; k < 3; k++)
queryGenerator.inflate(lts, k);
}
diff --git a/harry-integration/test/harry/model/HistoryBuilderTest.java b/harry-integration/test/harry/model/HistoryBuilderTest.java
index 5ccca60..e7f3413 100644
--- a/harry-integration/test/harry/model/HistoryBuilderTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderTest.java
@@ -129,8 +129,9 @@ public class HistoryBuilderTest
public void afterLts(long lts, long pd){}
public void beforeBatch(long lts, long pd, long m){}
public void afterBatch(long lts, long pd, long m){}
+ public void shutdown() {}
});
- visitor.replayAll(run);
+ visitor.replayAll();
for (Counts counts : model)
Assert.assertTrue(counts.toString(), counts.allDone());
});
diff --git a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
index 0f28005..c84c789 100644
--- a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
+++ b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
@@ -32,7 +32,7 @@ import harry.model.sut.InJVMTokenAwareVisitExecutor;
import harry.model.sut.InJvmSut;
import harry.model.sut.SystemUnderTest;
import harry.runner.RepairingLocalStateValidator;
-import harry.visitors.GeneratingVisitor;
+import harry.visitors.MutatingVisitor;
import harry.visitors.Visitor;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
@@ -70,21 +70,20 @@ public class InJVMTokenAwareExecutorTest extends IntegrationTestBase
Run run = configuration.createRun();
run.sut.schemaChange(run.schemaSpec.compile().cql());
- Visitor visitor = new GeneratingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
- new Configuration.MutatingRowVisitorConfiguration(),
- SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
+ Visitor visitor = new MutatingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
+ new Configuration.MutatingRowVisitorConfiguration(),
+ SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
OpSelectors.MonotonicClock clock = run.clock;
long maxPd = 0;
for (int i = 0; i < 10000; i++)
{
- long lts = clock.nextLts();
- visitor.visit(lts);
- maxPd = Math.max(maxPd, run.pdSelector.positionFor(lts));
+ visitor.visit();
+ maxPd = Math.max(maxPd, run.pdSelector.positionFor(clock.peek()));
}
RepairingLocalStateValidator validator = new RepairingLocalStateValidator(5, 1, run, new Configuration.QuiescentCheckerConfig());
- validator.visit(clock.maxLts());
+ validator.visit();
}
}
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index a6a9697..6d8b4ef 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -95,9 +95,9 @@ public class IntegrationTestBase extends TestBaseImpl
.setCreateSchema(true)
.setTruncateTable(false)
.setDropSchema(true)
- .setSchemaProvider(seed1 -> schema)
+ .setSchemaProvider((seed1, sut) -> schema)
.setClusteringDescriptorSelector(sharedCDSelectorConfiguration().build())
.setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, 200))
.setSUT(() -> sut);
}
-}
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
index 576a291..3381137 100644
--- a/harry-integration/test/harry/model/ModelTestBase.java
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -18,7 +18,6 @@
package harry.model;
-import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -30,9 +29,9 @@ import harry.ddl.SchemaGenerators;
import harry.ddl.SchemaSpec;
import harry.visitors.LoggingVisitor;
import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
import harry.runner.Runner;
import harry.visitors.SingleValidator;
+import harry.visitors.Visitor;
public abstract class ModelTestBase extends IntegrationTestBase
{
@@ -46,33 +45,30 @@ public abstract class ModelTestBase extends IntegrationTestBase
}
}
- void negativeIntegrationTest(Model.ModelFactory factory) throws Throwable
+ void negativeIntegrationTest(Configuration.RunnerConfiguration runnerConfig) throws Throwable
{
Supplier<SchemaSpec> supplier = SchemaGenerators.progression(1);
for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
{
SchemaSpec schema = supplier.get();
Configuration.ConfigurationBuilder builder = configuration(i, schema);
+
builder.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(10),
1, TimeUnit.SECONDS))
- .setRunTime(1, TimeUnit.MINUTES)
.setCreateSchema(false)
.setDropSchema(false)
- .setRunner(new Configuration.SequentialRunnerConfig(Arrays.asList(new Configuration.LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
- new Configuration.RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
- new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))));
- Runner runner = builder.build().createRunner();
+ .setRunner(runnerConfig);
+
+ Configuration config = builder.build();
+ Runner runner = config.createRunner();
+
try
{
Run run = runner.getRun();
beforeEach();
run.sut.schemaChange(run.schemaSpec.compile().cql());
- runner.initAndStartAll().get(2, TimeUnit.MINUTES);
- }
- catch (Throwable t)
- {
- throw t;
+ runner.initAndStartAll().get(4, TimeUnit.MINUTES);
}
finally
{
@@ -83,7 +79,7 @@ public abstract class ModelTestBase extends IntegrationTestBase
abstract Configuration.ModelConfiguration modelConfiguration();
- protected Visitor validator(Run run)
+ protected SingleValidator validator(Run run)
{
return new SingleValidator(100, run , modelConfiguration());
}
@@ -101,17 +97,13 @@ public abstract class ModelTestBase extends IntegrationTestBase
beforeEach();
run.sut.schemaChange(run.schemaSpec.compile().cql());
System.out.println(run.schemaSpec.compile().cql());
- OpSelectors.MonotonicClock clock = run.clock;
- Visitor validator = validator(run);
Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
for (int i = 0; i < 20000; i++)
- {
- long lts = clock.nextLts();
- visitor.visit(lts);
- }
+ visitor.visit();
+ SingleValidator validator = validator(run);
validator.visit(0);
if (!corrupt.apply(run))
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 92498dd..13ae548 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -41,9 +41,9 @@ import harry.corruptor.ShowValueCorruptor;
import harry.ddl.SchemaGenerators;
import harry.visitors.MutatingVisitor;
import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
import harry.operations.Query;
import harry.operations.QueryGenerator;
+import harry.visitors.Visitor;
import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
@@ -117,10 +117,7 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
for (int i = 0; i < CYCLES; i++)
- {
- long lts = clock.nextLts();
- visitor.visit(lts);
- }
+ visitor.visit();
while (true)
{
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 6d8dc5c..099adf1 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -33,9 +33,9 @@ import harry.model.sut.SystemUnderTest;
import harry.operations.CompiledStatement;
import harry.visitors.MutatingVisitor;
import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
import harry.operations.Query;
import harry.operations.QueryGenerator;
+import harry.visitors.Visitor;
import static harry.generators.DataGenerators.NIL_DESCR;
@@ -70,15 +70,11 @@ public class QuerySelectorTest extends IntegrationTestBase
Run run = config.createRun();
run.sut.schemaChange(run.schemaSpec.compile().cql());
- OpSelectors.MonotonicClock clock = run.clock;
Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
for (int i = 0; i < CYCLES; i++)
- {
- long lts = clock.nextLts();
- visitor.visit(lts);
- }
+ visitor.visit();
QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
@@ -145,14 +141,10 @@ public class QuerySelectorTest extends IntegrationTestBase
.build();
Run run = config.createRun();
run.sut.schemaChange(run.schemaSpec.compile().cql());
- OpSelectors.MonotonicClock clock = run.clock;
Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
for (int i = 0; i < CYCLES; i++)
- {
- long lts = clock.nextLts();
- visitor.visit(lts);
- }
+ visitor.visit();
QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
Model model = new QuiescentChecker(run);
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index 945dce2..97a554a 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -18,9 +18,19 @@
package harry.model;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Test;
import harry.core.Configuration;
+import harry.core.Configuration.AllPartitionsValidatorConfiguration;
+import harry.core.Configuration.ConcurrentRunnerConfig;
+import harry.core.Configuration.LoggingVisitorConfiguration;
+import harry.core.Configuration.RecentPartitionsValidatorConfiguration;
+import harry.core.Configuration.SequentialRunnerConfig;
+import harry.core.Configuration.SingleVisitRunnerConfig;
import harry.core.Run;
import harry.corruptor.AddExtraRowCorruptor;
import harry.corruptor.ChangeValueCorruptor;
@@ -29,14 +39,14 @@ import harry.corruptor.HideValueCorruptor;
import harry.corruptor.QueryResponseCorruptor;
import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
import harry.ddl.SchemaSpec;
-import harry.visitors.Visitor;
import harry.operations.Query;
+import harry.runner.StagedRunner.StagedRunnerConfig;
import harry.visitors.SingleValidator;
public class QuiescentCheckerIntegrationTest extends ModelTestBase
{
@Override
- protected Visitor validator(Run run)
+ protected SingleValidator validator(Run run)
{
return new SingleValidator(100, run, modelConfiguration());
}
@@ -56,7 +66,30 @@ public class QuiescentCheckerIntegrationTest extends ModelTestBase
@Test
public void normalConditionIntegrationTest() throws Throwable
{
- negativeIntegrationTest(modelConfiguration()::make);
+ Model.ModelFactory factory = modelConfiguration();
+
+ SequentialRunnerConfig sequential =
+ new SequentialRunnerConfig(Arrays.asList(new LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+ new RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
+ new AllPartitionsValidatorConfiguration(10, 10, factory::make)),
+ 1, TimeUnit.MINUTES);
+ negativeIntegrationTest(sequential);
+ }
+
+ @Test
+ public void normalConditionStagedIntegrationTest() throws Throwable
+ {
+ Model.ModelFactory factory = modelConfiguration();
+
+ ConcurrentRunnerConfig concurrent =
+ new ConcurrentRunnerConfig(4, Collections.singletonList(new LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration())),
+ 30, TimeUnit.SECONDS);
+ SingleVisitRunnerConfig sequential =
+ new SingleVisitRunnerConfig(Collections.singletonList(new RecentPartitionsValidatorConfiguration(1024, 0, 1, factory::make)));
+
+ StagedRunnerConfig staged = new StagedRunnerConfig(Arrays.asList(concurrent, sequential), 2, TimeUnit.MINUTES);
+
+ negativeIntegrationTest(staged);
}
@Test
diff --git a/harry-integration/test/resources/single_partition_test.yml b/harry-integration/test/resources/single_partition_test.yml
index 8ec4c4e..3f2017f 100644
--- a/harry-integration/test/resources/single_partition_test.yml
+++ b/harry-integration/test/resources/single_partition_test.yml
@@ -12,9 +12,6 @@ clock:
offset:
offset: 1000
-run_time: 10
-run_time_unit: "MINUTES"
-
system_under_test:
println: {}
@@ -49,6 +46,8 @@ data_tracker:
runner:
sequential:
+ run_time: 10
+ run_time_unit: "MINUTES"
visitors: []
metric_reporter:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org