You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/03/25 09:51:30 UTC

[cassandra-harry] branch CASSANDRA-16262 created (now 8238908)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch CASSANDRA-16262
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git.


      at 8238908  Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:

This branch includes the following new commits:

     new 8238908  Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra-harry] 01/01: Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-16262
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git

commit 8238908162122dbad3eeb7857e1141f0b4205b95
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Wed Mar 24 16:11:57 2021 +0100

    Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:
    
      * Refactor Run to make it an entrypoint
      * Separate Partition visitors from Row visitors
      * Make it possible to effortlessly check local states
      * Introduce CLs
      * More clear distinction between the components allowing to implement visitors (such as repairing validator)
      * Implement fault injecting partition visitor
      * Extract DataTracker
      * Minor bug fixes
    
    Patch by Alex Petrov for CASSANDRA-16262.
---
 harry-core/src/harry/core/Configuration.java       | 384 +++++++++++++--------
 .../sut/NoOpSut.java => core/MetricReporter.java}  |  37 +-
 harry-core/src/harry/core/Run.java                 |  64 ++--
 .../src/harry/corruptor/AddExtraRowCorruptor.java  |   4 +-
 .../harry/corruptor/QueryResponseCorruptor.java    |   2 +-
 harry-core/src/harry/corruptor/RowCorruptor.java   |   2 +-
 harry-core/src/harry/ddl/SchemaGenerators.java     |  66 +++-
 harry-core/src/harry/ddl/SchemaSpec.java           |   6 +
 harry-core/src/harry/generators/PCGFastPure.java   |   2 +-
 harry-core/src/harry/model/DoNothingModel.java     |   6 +-
 harry-core/src/harry/model/ExhaustiveChecker.java  |  83 ++---
 harry-core/src/harry/model/Model.java              |  23 +-
 harry-core/src/harry/model/OpSelectors.java        |  24 +-
 harry-core/src/harry/model/QuiescentChecker.java   |  54 +--
 harry-core/src/harry/model/SelectHelper.java       |   2 +-
 .../harry/model/StatelessVisibleRowsChecker.java   |  36 +-
 harry-core/src/harry/model/VisibleRowsChecker.java | 113 +++---
 harry-core/src/harry/model/sut/PrintlnSut.java     |   9 +-
 .../src/harry/model/sut/SystemUnderTest.java       |  41 ++-
 harry-core/src/harry/reconciler/Reconciler.java    |  43 ++-
 .../src/harry/runner/AbstractPartitionVisitor.java |   3 +-
 .../src/harry/runner/AllPartitionsValidator.java   | 106 ++++++
 .../src/harry/runner/DataTracker.java              |  33 +-
 .../src/harry/runner/DefaultDataTracker.java       | 140 ++++++++
 .../runner/DefaultPartitionVisitorFactory.java     | 198 -----------
 harry-core/src/harry/runner/HarryRunner.java       |  36 +-
 .../src/harry/runner/LoggingPartitionVisitor.java  | 100 ++++++
 .../src/harry/runner/MutatingPartitionVisitor.java | 133 +++++++
 ...aultRowVisitor.java => MutatingRowVisitor.java} |  39 ++-
 harry-core/src/harry/runner/PartitionVisitor.java  |  11 +-
 harry-core/src/harry/runner/QueryGenerator.java    | 364 +++++++++++++++++++
 .../src/harry/runner/RecentPartitionValidator.java |  84 +++++
 harry-core/src/harry/runner/RowVisitor.java        |  13 +-
 harry-core/src/harry/runner/Runner.java            | 162 ++++-----
 .../src/harry/runner/SinglePartitionValidator.java |  56 +++
 harry-core/src/harry/runner/Validator.java         | 119 -------
 harry-core/src/harry/util/Ranges.java              |  25 +-
 .../test/harry/generators/RandomGeneratorTest.java |  32 +-
 .../test/harry/generators/SurjectionsTest.java     |   4 +-
 harry-core/test/harry/model/OpSelectorsTest.java   |  96 +++---
 harry-core/test/harry/operations/RelationTest.java | 215 ++++++------
 .../model/sut/external/ExternalClusterSut.java     |  24 +-
 .../harry/runner/external/HarryRunnerExternal.java |   8 +-
 .../src/harry/model/sut}/ExternalClusterSut.java   |  78 ++---
 .../src/harry/model/sut/InJvmSut.java              | 111 +++++-
 .../runner/FaultInjectingPartitionVisitor.java     |  94 +++++
 .../src/harry/runner/QueryingNoOpChecker.java      |  65 ++++
 .../harry/runner/RepairingLocalStateValidator.java | 137 ++++++++
 harry-integration/src/harry/runner/Reproduce.java  |   4 +-
 .../model/ExhaustiveCheckerIntegrationTest.java    | 211 +++++------
 .../harry/model/ExhaustiveCheckerUnitTest.java     | 100 +++---
 .../test/harry/model/IntegrationTestBase.java      |  28 +-
 harry-integration/test/harry/model/ModelTest.java  |  28 +-
 .../test/harry/model/ModelTestBase.java            | 102 ++++++
 .../harry/model/QuerySelectorNegativeTest.java     |  43 ++-
 .../test/harry/model/QuerySelectorTest.java        |  46 +--
 .../model/QuiescentCheckerIntegrationTest.java     | 111 +++---
 .../test/harry/op/RowVisitorTest.java              |  51 +--
 pom.xml                                            |   4 +-
 59 files changed, 2812 insertions(+), 1403 deletions(-)

diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index a3c287f..27ee176 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -46,14 +45,16 @@ import harry.model.OpSelectors;
 import harry.model.QuiescentChecker;
 import harry.model.clock.ApproximateMonotonicClock;
 import harry.model.sut.SystemUnderTest;
-import harry.runner.DefaultPartitionVisitorFactory;
-import harry.runner.DefaultRowVisitor;
+import harry.runner.AllPartitionsValidator;
+import harry.runner.DataTracker;
+import harry.runner.DefaultDataTracker;
+import harry.runner.LoggingPartitionVisitor;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
-import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.RecentPartitionValidator;
 import harry.runner.RowVisitor;
 import harry.runner.Runner;
-import harry.runner.Validator;
 import harry.util.BitSet;
 
 public class Configuration
@@ -70,13 +71,22 @@ public class Configuration
         mapper.registerSubtypes(Configuration.DebugApproximateMonotonicClockConfiguration.class);
         mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
         mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
+        mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
+        mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
 
         mapper.registerSubtypes(Configuration.ExhaustiveCheckerConfig.class);
+        mapper.registerSubtypes(Configuration.QuiescentCheckerConfig.class);
         mapper.registerSubtypes(Configuration.DefaultCDSelectorConfiguration.class);
         mapper.registerSubtypes(Configuration.DefaultPDSelectorConfiguration.class);
         mapper.registerSubtypes(Configuration.ConstantDistributionConfig.class);
         mapper.registerSubtypes(DefaultSchemaProviderConfiguration.class);
-        mapper.registerSubtypes(DefaultRowVisitorConfiguration.class);
+        mapper.registerSubtypes(MutatingRowVisitorConfiguration.class);
+
+        mapper.registerSubtypes(MutatingPartitionVisitorConfiguation.class);
+        mapper.registerSubtypes(LoggingPartitionVisitorConfiguration.class);
+        mapper.registerSubtypes(AllPartitionsValidatorConfiguration.class);
+        mapper.registerSubtypes(RecentPartitionsValidatorConfiguration.class);
+        mapper.registerSubtypes(FixedSchemaProviderConfiguration.class);
     }
 
     public final long seed;
@@ -86,12 +96,11 @@ public class Configuration
     public final boolean create_schema;
     public final boolean truncate_table;
 
+    public final MetricReporterConfiguration metric_reporter;
     public final ClockConfiguration clock;
-    public final RunnerConfiguration runner;
     public final SutConfiguration system_under_test;
-    public final ModelConfiguration model;
-    public final RowVisitorConfiguration row_visitor;
-
+    public final DataTrackerConfiguration data_tracker;
+    public final RunnerConfiguration runner;
     public final PDSelectorConfiguration partition_descriptor_selector;
     public final CDSelectorConfiguration clustering_descriptor_selector;
 
@@ -104,11 +113,11 @@ public class Configuration
                          @JsonProperty("drop_schema") boolean drop_schema,
                          @JsonProperty("create_schema") boolean create_schema,
                          @JsonProperty("truncate_schema") boolean truncate_table,
+                         @JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
                          @JsonProperty("clock") ClockConfiguration clock,
                          @JsonProperty("runner") RunnerConfiguration runner,
                          @JsonProperty("system_under_test") SutConfiguration system_under_test,
-                         @JsonProperty("model") ModelConfiguration model,
-                         @JsonProperty("row_visitor") RowVisitorConfiguration row_visitor,
+                         @JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
                          @JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
                          @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
                          @JsonProperty(value = "run_time", defaultValue = "2") long run_time,
@@ -119,15 +128,15 @@ public class Configuration
         this.drop_schema = drop_schema;
         this.create_schema = create_schema;
         this.truncate_table = truncate_table;
+        this.metric_reporter = metric_reporter;
         this.clock = clock;
-        this.runner = runner;
         this.system_under_test = system_under_test;
-        this.model = model;
-        this.row_visitor = row_visitor;
+        this.data_tracker = data_tracker;
         this.partition_descriptor_selector = partition_descriptor_selector;
         this.clustering_descriptor_selector = clustering_descriptor_selector;
         this.run_time = run_time;
         this.run_time_unit = run_time_unit;
+        this.runner = runner;
     }
 
     public static void registerSubtypes(Class<?>... classes)
@@ -178,6 +187,13 @@ public class Configuration
 
     public static void validate(Configuration config)
     {
+        Objects.requireNonNull(config.schema_provider, "Schema provider should not be null");
+        Objects.requireNonNull(config.metric_reporter, "Metric reporter should not be null");
+        Objects.requireNonNull(config.clock, "Clock should not be null");
+        Objects.requireNonNull(config.system_under_test, "System under test should not be null");
+        Objects.requireNonNull(config.partition_descriptor_selector, "Partition descriptor selector should not be null");
+        Objects.requireNonNull(config.clustering_descriptor_selector, "Clustering descriptor selector should not be null");
+
         // TODO: validation
         //assert historySize * clockEpochTimeUnit.toMillis(clockEpoch) > runTimePeriod.toMillis(runTime) : "History size is too small for this run";
     }
@@ -194,53 +210,39 @@ public class Configuration
 
     public static Run createRun(Configuration snapshot)
     {
+        validate(snapshot);
+
         long seed = snapshot.seed;
 
+        DataTracker tracker = snapshot.data_tracker == null ? new DefaultDataTrackerConfiguration().make() : snapshot.data_tracker.make();
         OpSelectors.Rng rng = new OpSelectors.PCGFast(seed);
 
         OpSelectors.MonotonicClock clock = snapshot.clock.make();
 
-        // TODO: parsing schema
+        MetricReporter metricReporter = snapshot.metric_reporter.make();
+        // TODO: parse schema
         SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);
+        schemaSpec.validate();
 
         OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
         OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
 
         SystemUnderTest sut = snapshot.system_under_test.make();
-        QuerySelector querySelector = new QuerySelector(schemaSpec,
-                                                        pdSelector,
-                                                        descriptorSelector,
-                                                        Surjections.pick(Query.QueryKind.CLUSTERING_SLICE,
-                                                                         Query.QueryKind.CLUSTERING_RANGE),
-                                                        rng);
-        Model model = snapshot.model.create(schemaSpec, pdSelector, descriptorSelector, clock, querySelector, sut);
-        Validator validator = new Validator(model, schemaSpec, clock, pdSelector, descriptorSelector, rng);
-
-        RowVisitor rowVisitor;
-        if (snapshot.row_visitor != null)
-            rowVisitor = snapshot.row_visitor.make(schemaSpec, clock, descriptorSelector, querySelector);
-        else
-            rowVisitor = new DefaultRowVisitor(schemaSpec, clock, descriptorSelector, querySelector);
-
-        // TODO: make this one configurable, too?
-        Supplier<PartitionVisitor> visitorFactory = new DefaultPartitionVisitorFactory(model, sut, pdSelector, descriptorSelector, schemaSpec, rowVisitor);
+
         return new Run(rng,
                        clock,
                        pdSelector,
                        descriptorSelector,
                        schemaSpec,
-                       model,
+                       tracker,
                        sut,
-                       validator,
-                       rowVisitor,
-                       visitorFactory,
-                       snapshot);
+                       metricReporter);
     }
 
-    public static Runner createRunner(Configuration snapshot)
+    public static Runner createRunner(Configuration config)
     {
-        Run run = createRun(snapshot);
-        return snapshot.runner.make(run);
+        Run run = createRun(config);
+        return config.runner.make(run, config);
     }
 
     public static class ConfigurationBuilder
@@ -253,11 +255,10 @@ public class Configuration
         boolean truncate_table;
 
         ClockConfiguration clock;
+        MetricReporterConfiguration metric_reporter = new NoOpMetricReporterConfiguration();
+        DataTrackerConfiguration data_tracker = new DefaultDataTrackerConfiguration();
         RunnerConfiguration runner;
         SutConfiguration system_under_test;
-        ModelConfiguration model;
-        RowVisitorConfiguration row_visitor = new DefaultRowVisitorConfiguration();
-
         PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
         CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
 
@@ -283,13 +284,19 @@ public class Configuration
             return this;
         }
 
+
+        public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
+        {
+            this.data_tracker = tracker;
+            return this;
+        }
+
         public ConfigurationBuilder setClock(ClockConfiguration clock)
         {
             this.clock = clock;
             return this;
         }
 
-
         public ConfigurationBuilder setSUT(SutConfiguration system_under_test)
         {
             this.system_under_test = system_under_test;
@@ -314,12 +321,6 @@ public class Configuration
             return this;
         }
 
-        public ConfigurationBuilder setModel(ModelConfiguration model)
-        {
-            this.model = model;
-            return this;
-        }
-
         public ConfigurationBuilder setRunner(RunnerConfiguration runner)
         {
             this.runner = runner;
@@ -345,28 +346,28 @@ public class Configuration
             return setClusteringDescriptorSelector(builder.build());
         }
 
-        public ConfigurationBuilder setRowVisitor(RowVisitorConfiguration row_visitor)
+        public ConfigurationBuilder setMetricReporter(MetricReporterConfiguration metric_reporter)
         {
-            this.row_visitor = row_visitor;
+            this.metric_reporter = metric_reporter;
             return this;
         }
 
         public Configuration build()
         {
             return new Configuration(seed,
-                                     Objects.requireNonNull(schema_provider, "Schema provider should not be null"),
+                                     schema_provider,
                                      drop_schema,
                                      create_schema,
                                      truncate_table,
+                                     metric_reporter,
+                                     clock,
 
-                                     Objects.requireNonNull(clock, "Clock should not be null"),
                                      runner,
-                                     Objects.requireNonNull(system_under_test, "System under test should not be null"),
-                                     Objects.requireNonNull(model, "Model should not be null"),
-                                     Objects.requireNonNull(row_visitor, "Row visitor should not be null"),
+                                     system_under_test,
+                                     data_tracker,
 
-                                     Objects.requireNonNull(partition_descriptor_selector, "Partition descriptor selector should not be null"),
-                                     Objects.requireNonNull(clustering_descriptor_selector, "Clustering descriptor selector should not be null"),
+                                     partition_descriptor_selector,
+                                     clustering_descriptor_selector,
 
                                      run_time,
                                      run_time_unit);
@@ -385,8 +386,6 @@ public class Configuration
         builder.clock = clock;
         builder.runner = runner;
         builder.system_under_test = system_under_test;
-        builder.model = model;
-        builder.row_visitor = row_visitor;
 
         builder.partition_descriptor_selector = partition_descriptor_selector;
         builder.clustering_descriptor_selector = clustering_descriptor_selector;
@@ -396,6 +395,40 @@ public class Configuration
         return builder;
     }
 
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+    public interface DataTrackerConfiguration extends DataTracker.DataTrackerFactory
+    {
+
+    }
+
+    @JsonTypeName("default")
+    public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
+    {
+        public final long max_seen_lts;
+        public final long max_complete_lts;
+
+        public DefaultDataTrackerConfiguration()
+        {
+            this(-1, -1);
+        }
+
+        @JsonCreator
+        public DefaultDataTrackerConfiguration(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
+                                               @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
+        {
+            this.max_seen_lts = max_seen_lts;
+            this.max_complete_lts = max_complete_lts;
+        }
+
+        public DataTracker make()
+        {
+            DefaultDataTracker defaultDataTracker = new DefaultDataTracker();
+            defaultDataTracker.forceLts(max_seen_lts, max_complete_lts);
+            return defaultDataTracker;
+        }
+    }
+
     @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
     public interface ClockConfiguration extends OpSelectors.MonotonicClockFactory
     {
@@ -475,46 +508,39 @@ public class Configuration
     @JsonTypeName("concurrent")
     public static class ConcurrentRunnerConfig implements RunnerConfiguration
     {
-        public final int writer_threads;
-        public final int round_robin_validator_threads;
-        public final int recent_partition_validator_threads;
+        public final int concurrency;
+        public final List<PartitionVisitorConfiguration> partition_visitor_factories;
 
         @JsonCreator
-        public ConcurrentRunnerConfig(@JsonProperty(value = "writer_threads", defaultValue = "2") int writer_threads,
-                                      @JsonProperty(value = "round_robin_validator_threads", defaultValue = "2") int round_robin_validator_threads,
-                                      @JsonProperty(value = "recent_partition_validator_threads", defaultValue = "2") int recent_partition_validator_threads)
+        public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
+                                      @JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
         {
-            this.writer_threads = writer_threads;
-            this.round_robin_validator_threads = round_robin_validator_threads;
-            this.recent_partition_validator_threads = recent_partition_validator_threads;
+            this.concurrency = concurrency;
+            this.partition_visitor_factories = partitionVisitors;
         }
 
-        public Runner make(Run run)
+        @Override
+        public Runner make(Run run, Configuration config)
         {
-            return new Runner.ConcurrentRunner(run, writer_threads, round_robin_validator_threads, recent_partition_validator_threads);
+            return new Runner.ConcurrentRunner(run, config, concurrency, partition_visitor_factories);
         }
     }
 
     @JsonTypeName("sequential")
     public static class SequentialRunnerConfig implements RunnerConfiguration
     {
-        private final int round_robin_validator_threads;
-        private final int check_recent_after;
-        private final int check_all_after;
+        public final List<PartitionVisitorConfiguration> partition_visitor_factories;
 
         @JsonCreator
-        public SequentialRunnerConfig(@JsonProperty(value = "round_robin_validator_threads", defaultValue = "2") int round_robin_validator_threads,
-                                      @JsonProperty(value = "check_recent_after", defaultValue = "100") int check_recent_after,
-                                      @JsonProperty(value = "check_all_after", defaultValue = "5000") int check_all_after)
+        public SequentialRunnerConfig(@JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
         {
-            this.round_robin_validator_threads = round_robin_validator_threads;
-            this.check_recent_after = check_recent_after;
-            this.check_all_after = check_all_after;
+            this.partition_visitor_factories = partitionVisitors;
         }
 
-        public Runner make(Run run)
+        @Override
+        public Runner make(Run run, Configuration config)
         {
-            return new Runner.SequentialRunner(run, round_robin_validator_threads, check_recent_after, check_all_after);
+            return new Runner.SequentialRunner(run, config, partition_visitor_factories);
         }
     }
 
@@ -531,64 +557,29 @@ public class Configuration
     @JsonTypeName("exhaustive_checker")
     public static class ExhaustiveCheckerConfig implements ModelConfiguration
     {
-        public final long max_seen_lts;
-        public final long max_complete_lts;
-
+        @JsonCreator
         public ExhaustiveCheckerConfig()
         {
-            this(-1, -1);
-        }
 
-        @JsonCreator
-        public ExhaustiveCheckerConfig(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
-                                       @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
-        {
-            this.max_seen_lts = max_seen_lts;
-            this.max_complete_lts = max_complete_lts;
         }
 
-        public Model create(SchemaSpec schema, OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, OpSelectors.MonotonicClock clock, QuerySelector querySelector, SystemUnderTest sut)
+        public Model make(Run run)
         {
-            ExhaustiveChecker exhaustiveChecker = new ExhaustiveChecker(schema,
-                                                                        pdSelector,
-                                                                        descriptorSelector,
-                                                                        clock,
-                                                                        querySelector,
-                                                                        sut);
-            exhaustiveChecker.forceLts(max_seen_lts, max_complete_lts);
-            return exhaustiveChecker;
+            return new ExhaustiveChecker(run);
         }
     }
 
     @JsonTypeName("quiescent_checker")
     public static class QuiescentCheckerConfig implements ModelConfiguration
     {
-        public final long max_seen_lts;
-        public final long max_complete_lts;
-
-        public QuiescentCheckerConfig()
-        {
-            this(-1, -1);
-        }
-
         @JsonCreator
-        public QuiescentCheckerConfig(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
-                                      @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
+        public QuiescentCheckerConfig()
         {
-            this.max_seen_lts = max_seen_lts;
-            this.max_complete_lts = max_complete_lts;
         }
 
-        public Model create(SchemaSpec schema, OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, OpSelectors.MonotonicClock clock, QuerySelector querySelector, SystemUnderTest sut)
+        public Model make(Run run)
         {
-            QuiescentChecker exhaustiveChecker = new QuiescentChecker(schema,
-                                                                      pdSelector,
-                                                                      descriptorSelector,
-                                                                      clock,
-                                                                      querySelector,
-                                                                      sut);
-            exhaustiveChecker.forceLts(max_seen_lts, max_complete_lts);
-            return exhaustiveChecker;
+            return new QuiescentChecker(run);
         }
     }
 
@@ -875,23 +866,108 @@ public class Configuration
         }
     }
 
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+    public interface PartitionVisitorConfiguration extends PartitionVisitor.PartitionVisitorFactory
+    {
+    }
+
+
+    @JsonTypeName("mutating")
+    public static class MutatingPartitionVisitorConfiguation implements PartitionVisitorConfiguration
+    {
+        protected final RowVisitorConfiguration row_visitor;
+
+        @JsonCreator
+        public MutatingPartitionVisitorConfiguation(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+        {
+            this.row_visitor = row_visitor;
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new MutatingPartitionVisitor(run, row_visitor);
+        }
+    }
+
+    @JsonTypeName("logging")
+    public static class LoggingPartitionVisitorConfiguration implements PartitionVisitorConfiguration
+    {
+        protected final RowVisitorConfiguration row_visitor;
+
+        @JsonCreator
+        public LoggingPartitionVisitorConfiguration(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+        {
+            this.row_visitor = row_visitor;
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new LoggingPartitionVisitor(run, row_visitor);
+        }
+    }
+
+    @JsonTypeName("validate_all_partitions")
+    public static class AllPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    {
+        private final int concurrency;
+        private final int trigger_after;
+        private final Configuration.ModelConfiguration modelConfiguration;
+
+        @JsonCreator
+        public AllPartitionsValidatorConfiguration(@JsonProperty("concurrency") int concurrency,
+                                                   @JsonProperty("trigger_after") int trigger_after,
+                                                   @JsonProperty("model") Configuration.ModelConfiguration model)
+        {
+            this.concurrency = concurrency;
+            this.trigger_after = trigger_after;
+            this.modelConfiguration = model;
+        }
+
+        public PartitionVisitor make(Run run)
+        {
+            return new AllPartitionsValidator(concurrency, trigger_after, run, modelConfiguration);
+        }
+    }
+
+    @JsonTypeName("validate_recent_partitions")
+    public static class RecentPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    {
+        private final int partition_count;
+        private final int trigger_after;
+        private final Configuration.ModelConfiguration modelConfiguration;
+
+        @JsonCreator
+        public RecentPartitionsValidatorConfiguration(@JsonProperty("partition_count") int partition_count,
+                                                      @JsonProperty("trigger_after") int trigger_after,
+                                                      @JsonProperty("model") Configuration.ModelConfiguration model)
+        {
+            this.partition_count = partition_count;
+            this.trigger_after = trigger_after;
+            this.modelConfiguration = model;
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new RecentPartitionValidator(partition_count, trigger_after, run, modelConfiguration);
+        }
+    }
+
     @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
     public interface RowVisitorConfiguration extends RowVisitor.RowVisitorFactory
     {
     }
 
-    @JsonTypeName("default")
-    public static class DefaultRowVisitorConfiguration implements RowVisitorConfiguration
+    @JsonTypeName("mutating")
+    public static class MutatingRowVisitorConfiguration implements RowVisitorConfiguration
     {
-        public RowVisitor make(SchemaSpec schema,
-                               OpSelectors.MonotonicClock clock,
-                               OpSelectors.DescriptorSelector descriptorSelector,
-                               QuerySelector querySelector)
+        @Override
+        public RowVisitor make(Run run)
         {
-            return new DefaultRowVisitor(schema,
-                                         clock,
-                                         descriptorSelector,
-                                         querySelector);
+            return new MutatingRowVisitor(run);
         }
     }
 
@@ -910,5 +986,41 @@ public class Configuration
         }
     }
 
+    @JsonTypeName("fixed")
+    public static class FixedSchemaProviderConfiguration implements SchemaProviderConfiguration
+    {
+        private final SchemaSpec schemaSpec;
+
+        @JsonCreator
+        public FixedSchemaProviderConfiguration(@JsonProperty("keyspace") String keyspace,
+                                                @JsonProperty("table") String table,
+                                                @JsonProperty("partition_keys") Map<String, String> pks,
+                                                @JsonProperty("clustering_keys") Map<String, String> cks,
+                                                @JsonProperty("regular_columns") Map<String, String> regulars)
+        {
+            this.schemaSpec = SchemaGenerators.parse(keyspace, table,
+                                                     pks, cks, regulars);
+        }
+
+        public SchemaSpec make(long seed)
+        {
+            return schemaSpec;
+        }
+    }
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+    public interface MetricReporterConfiguration extends MetricReporter.MetricReporterFactory
+    {
+    }
+
+    @JsonTypeName("default")
+    public static class NoOpMetricReporterConfiguration implements MetricReporterConfiguration
+    {
+        public MetricReporter make()
+        {
+            return MetricReporter.NO_OP;
+        }
+    }
+
     // TODO: schema provider by DDL
 }
diff --git a/harry-core/src/harry/model/sut/NoOpSut.java b/harry-core/src/harry/core/MetricReporter.java
similarity index 58%
rename from harry-core/src/harry/model/sut/NoOpSut.java
rename to harry-core/src/harry/core/MetricReporter.java
index c09490f..b576ec3 100644
--- a/harry-core/src/harry/model/sut/NoOpSut.java
+++ b/harry-core/src/harry/core/MetricReporter.java
@@ -16,29 +16,34 @@
  *  limitations under the License.
  */
 
-package harry.model.sut;
+package harry.core;
 
-import java.util.concurrent.CompletableFuture;
-
-public class NoOpSut implements SystemUnderTest
+public interface MetricReporter
 {
-    public boolean isShutdown()
-    {
-        return false;
-    }
+    void columnDelete();
+    void rowDelete();
+    void insert();
+    void rangeDelete();
 
-    public void shutdown()
-    {
-    }
+    void validatePartition();
+    void validateRandomQuery();
 
-    public Object[][] execute(String statement, Object... bindings)
+    interface MetricReporterFactory
     {
-        return new Object[0][];
+        MetricReporter make();
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    MetricReporter NO_OP = new NoOpMetricReporter();
+
+    class NoOpMetricReporter implements MetricReporter
     {
-        return CompletableFuture.supplyAsync(() -> execute(statement, bindings),
-                                             Runnable::run);
+        private NoOpMetricReporter() {}
+
+        public void columnDelete(){}
+        public void rowDelete(){}
+        public void insert(){}
+        public void rangeDelete(){}
+        public void validatePartition(){}
+        public void validateRandomQuery(){}
     }
 }
diff --git a/harry-core/src/harry/core/Run.java b/harry-core/src/harry/core/Run.java
index fdff08e..d31e7ad 100644
--- a/harry-core/src/harry/core/Run.java
+++ b/harry-core/src/harry/core/Run.java
@@ -18,56 +18,62 @@
 
 package harry.core;
 
-import java.util.function.Supplier;
-
 import harry.ddl.SchemaSpec;
-import harry.model.Model;
 import harry.model.OpSelectors;
 import harry.model.sut.SystemUnderTest;
-import harry.runner.PartitionVisitor;
-import harry.runner.RowVisitor;
-import harry.runner.Validator;
+import harry.runner.DataTracker;
+import harry.runner.QueryGenerator;
 
 public class Run
 {
     public final OpSelectors.Rng rng;
     public final OpSelectors.MonotonicClock clock;
     public final OpSelectors.PdSelector pdSelector;
-
     public final OpSelectors.DescriptorSelector descriptorSelector;
+    public final QueryGenerator rangeSelector;
 
     public final SchemaSpec schemaSpec;
-    public final Model model;
+    public final DataTracker tracker;
     public final SystemUnderTest sut;
-    public final Validator validator;
-    public final RowVisitor rowVisitor;
-    public final Supplier<PartitionVisitor> visitorFactory;
 
-    public final Configuration snapshot;
+    public final MetricReporter metricReporter;
+
+
+    public Run(OpSelectors.Rng rng,
+               OpSelectors.MonotonicClock clock,
+               OpSelectors.PdSelector pdSelector,
+               OpSelectors.DescriptorSelector descriptorSelector,
 
-    Run(OpSelectors.Rng rng,
-        OpSelectors.MonotonicClock clock,
-        OpSelectors.PdSelector pdSelector,
-        OpSelectors.DescriptorSelector descriptorSelector,
+               SchemaSpec schemaSpec,
+               DataTracker tracker,
+               SystemUnderTest sut,
+               MetricReporter metricReporter)
+    {
+        this(rng, clock, pdSelector, descriptorSelector,
+             new QueryGenerator(schemaSpec, pdSelector, descriptorSelector, rng),
+             schemaSpec, tracker, sut, metricReporter);
+    }
 
-        SchemaSpec schemaSpec,
-        Model model,
-        SystemUnderTest sut,
-        Validator validator,
-        RowVisitor rowVisitor,
-        Supplier<PartitionVisitor> visitorFactory,
-        Configuration snapshot)
+    private Run(OpSelectors.Rng rng,
+                OpSelectors.MonotonicClock clock,
+                OpSelectors.PdSelector pdSelector,
+                OpSelectors.DescriptorSelector descriptorSelector,
+                QueryGenerator rangeSelector,
+
+                SchemaSpec schemaSpec,
+                DataTracker tracker,
+                SystemUnderTest sut,
+                MetricReporter metricReporter)
     {
+
         this.rng = rng;
         this.clock = clock;
         this.pdSelector = pdSelector;
         this.descriptorSelector = descriptorSelector;
+        this.rangeSelector = rangeSelector;
         this.schemaSpec = schemaSpec;
-        this.model = model;
+        this.tracker = tracker;
         this.sut = sut;
-        this.validator = validator;
-        this.rowVisitor = rowVisitor;
-        this.visitorFactory = visitorFactory;
-        this.snapshot = snapshot;
+        this.metricReporter = metricReporter;
     }
-}
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
index 9a73867..f2a8b28 100644
--- a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
@@ -53,7 +53,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
     {
         Set<Long> cds = new HashSet<>();
         long maxLts = 0;
-        for (Object[] obj : sut.execute(query.toSelectStatement()))
+        for (Object[] obj : sut.execute(query.toSelectStatement(), SystemUnderTest.ConsistencyLevel.ALL))
         {
             ResultSetRow row = SelectHelper.resultSetToRow(schema, clock, obj);
             // TODO: extract CD cheaper
@@ -84,7 +84,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
         // written value and tombstone are resolved in favour of tombstone, so we're
         // just going to take the next lts.
         logger.info("Corrupting the resultset by writing a row with cd {}", cd);
-        sut.execute(WriteHelper.inflateInsert(schema, query.pd, cd, vds, clock.rts(maxLts) + 1));
+        sut.execute(WriteHelper.inflateInsert(schema, query.pd, cd, vds, clock.rts(maxLts) + 1), SystemUnderTest.ConsistencyLevel.ALL);
         return true;
     }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
index aefce0f..301c50b 100644
--- a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
+++ b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
@@ -52,7 +52,7 @@ public interface QueryResponseCorruptor
         {
             List<ResultSetRow> result = new ArrayList<>();
             CompiledStatement statement = query.toSelectStatement();
-            for (Object[] obj : sut.execute(statement.cql(), statement.bindings()))
+            for (Object[] obj : sut.execute(statement.cql(), SystemUnderTest.ConsistencyLevel.ALL, statement.bindings()))
                 result.add(SelectHelper.resultSetToRow(schema, clock, obj));
 
             // TODO: technically, we can do this just depends on corruption strategy
diff --git a/harry-core/src/harry/corruptor/RowCorruptor.java b/harry-core/src/harry/corruptor/RowCorruptor.java
index 85261b2..fbd5fa9 100644
--- a/harry-core/src/harry/corruptor/RowCorruptor.java
+++ b/harry-core/src/harry/corruptor/RowCorruptor.java
@@ -36,7 +36,7 @@ public interface RowCorruptor
         if (canCorrupt(row))
         {
             CompiledStatement statement = corrupt(row);
-            sut.execute(statement.cql(), statement.bindings());
+            sut.execute(statement.cql(), SystemUnderTest.ConsistencyLevel.ALL, statement.bindings());
             return true;
         }
         return false;
diff --git a/harry-core/src/harry/ddl/SchemaGenerators.java b/harry-core/src/harry/ddl/SchemaGenerators.java
index aecc189..53ab6f5 100644
--- a/harry-core/src/harry/ddl/SchemaGenerators.java
+++ b/harry-core/src/harry/ddl/SchemaGenerators.java
@@ -18,15 +18,19 @@
 
 package harry.ddl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import harry.generators.Generator;
 import harry.generators.Surjections;
 
@@ -40,10 +44,12 @@ public class SchemaGenerators
     }
 
     public static final Collection<ColumnSpec.DataType<?>> clusteringKeyTypes;
+    public static final Map<String, ColumnSpec.DataType<?>> nameToTypeMap;
     public static final Collection<ColumnSpec.DataType<?>> columnTypes;
 
     static
     {
+
         ImmutableList.Builder<ColumnSpec.DataType<?>> builder = ImmutableList.builder();
         builder.add(ColumnSpec.int8Type,
                     ColumnSpec.int16Type,
@@ -51,17 +57,27 @@ public class SchemaGenerators
                     ColumnSpec.int64Type,
 // TODO re-enable boolean type; add it to ByteBufferUtil in Cassandra for that
 //                    ColumnSpec.booleanType,
-                    ColumnSpec.floatType,
-                    ColumnSpec.doubleType,
                     ColumnSpec.asciiType);
         columnTypes = builder.build();
         builder = ImmutableList.builder();
         builder.addAll(columnTypes);
+
+        ImmutableMap.Builder<String, ColumnSpec.DataType<?>> mapBuilder = ImmutableMap.builder();
+
         for (ColumnSpec.DataType<?> columnType : columnTypes)
         {
-            builder.add(ColumnSpec.ReversedType.getInstance(columnType));
+            ColumnSpec.DataType<?> reversedType = ColumnSpec.ReversedType.getInstance(columnType);
+            builder.add(reversedType);
+
+            mapBuilder.put(columnType.toString(), columnType);
+            mapBuilder.put(String.format("desc(%s)", columnType.toString()), columnType);
         }
+
+        builder.add(ColumnSpec.floatType);
+        builder.add(ColumnSpec.doubleType);
+
         clusteringKeyTypes = builder.build();
+        nameToTypeMap = mapBuilder.build();
     }
 
     @SuppressWarnings("unchecked")
@@ -281,7 +297,7 @@ public class SchemaGenerators
     public static Surjections.Surjection<SchemaSpec> defaultSchemaSpecGen(String ks, String table)
     {
         return new SchemaGenerators.Builder(ks, () -> table)
-               .partitionKeySpec(1, 4,
+               .partitionKeySpec(2, 4,
 //                                                                             ColumnSpec.int8Type,
 //                                                                             ColumnSpec.int16Type,
                                  ColumnSpec.int32Type,
@@ -289,7 +305,7 @@ public class SchemaGenerators
 //                                                                             ColumnSpec.floatType,
 //                                                                             ColumnSpec.doubleType,
                                  ColumnSpec.asciiType(4, 10))
-               .clusteringKeySpec(1, 4,
+               .clusteringKeySpec(2, 4,
 //                                                                              ColumnSpec.int8Type,
 //                                                                              ColumnSpec.int16Type,
                                   ColumnSpec.int32Type,
@@ -366,6 +382,7 @@ public class SchemaGenerators
     longAndStringSpecWithReversedBothBuilder,
     withAllFeaturesEnabled
     };
+
     // Create schema generators that would produce tables starting with just a few features, progressing to use more
     public static Supplier<SchemaSpec> progression(int switchAfter)
     {
@@ -375,10 +392,11 @@ public class SchemaGenerators
 
         return new Supplier<SchemaSpec>()
         {
-            private final AtomicInteger counter = new AtomicInteger();
+            private int counter = 0;
             public SchemaSpec get()
             {
-                int idx = (counter.getAndIncrement() / switchAfter) % generators.length;
+                int idx = (counter / switchAfter) % generators.length;
+                counter++;
                 SchemaSpec spec = generators[idx].get();
                 int tries = 100;
                 while ((spec.ckGenerator.byteSize() != Long.BYTES || spec.pkGenerator.byteSize() != Long.BYTES) && tries > 0)
@@ -388,6 +406,8 @@ public class SchemaGenerators
                     tries--;
                 }
 
+                spec.validate();
+
                 assert tries > 0 : String.format("Max number of tries exceeded on generator %d, can't generate a needed schema", idx);
                 return spec;
             }
@@ -396,12 +416,34 @@ public class SchemaGenerators
         };
     }
 
-    public static int DEFAULT_SWITCH_AFTER = 5;
-    public static int GENERATORS_COUNT = PROGRESSIVE_GENERATORS.length;
-    public static int DEFAULT_RUNS = DEFAULT_SWITCH_AFTER * PROGRESSIVE_GENERATORS.length;
+    public static List<ColumnSpec<?>> toColumns(Map<String, String> config, ColumnSpec.Kind kind, boolean allowReverse)
+    {
+        List<ColumnSpec<?>> columns = new ArrayList<>(config.size());
+
+        for (Map.Entry<String, String> e : config.entrySet())
+        {
+            ColumnSpec.DataType<?> type = nameToTypeMap.get(e.getValue());
+            assert type != null : "Can't parse the type";
+            assert allowReverse || !type.isReversed() : String.format("%s columns aren't allowed to be reversed");
+            columns.add(new ColumnSpec<>(e.getKey(), type, kind));
+        }
 
-    public static Supplier<SchemaSpec> progression()
+        return columns;
+    }
+
+    public static SchemaSpec parse(String keyspace,
+                                   String table,
+                                   Map<String, String> pks,
+                                   Map<String, String> cks,
+                                   Map<String, String> regulars)
     {
-        return progression(DEFAULT_SWITCH_AFTER); // would generate 30 tables before wrapping around
+        return new SchemaSpec(keyspace, table,
+                              toColumns(pks, ColumnSpec.Kind.PARTITION_KEY, false),
+                              toColumns(cks, ColumnSpec.Kind.CLUSTERING, false),
+                              toColumns(regulars, ColumnSpec.Kind.REGULAR, false));
     }
+
+    public static int DEFAULT_SWITCH_AFTER = Integer.getInteger("harry.test.progression.switch-after", 5);
+    public static int GENERATORS_COUNT = PROGRESSIVE_GENERATORS.length;
+    public static int DEFAULT_RUNS = DEFAULT_SWITCH_AFTER * GENERATORS_COUNT;
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java
index 336bd67..11b7009 100644
--- a/harry-core/src/harry/ddl/SchemaSpec.java
+++ b/harry-core/src/harry/ddl/SchemaSpec.java
@@ -96,6 +96,12 @@ public class SchemaSpec
         this.ALL_COLUMNS_BITSET = BitSet.allSet(regularColumns.size());
     }
 
+    public void validate()
+    {
+        assert pkGenerator.byteSize() == Long.BYTES : partitionKeys.toString();
+        assert ckGenerator.byteSize() == Long.BYTES : clusteringKeys.toString();
+    }
+
     public static interface AddRelationCallback
     {
         public void accept(ColumnSpec spec, Relation.RelationKind kind, Object value);
diff --git a/harry-core/src/harry/generators/PCGFastPure.java b/harry-core/src/harry/generators/PCGFastPure.java
index 48bfae3..9e438dc 100644
--- a/harry-core/src/harry/generators/PCGFastPure.java
+++ b/harry-core/src/harry/generators/PCGFastPure.java
@@ -105,7 +105,7 @@ public class PCGFastPure
             curMult *= curMult;
         }
 
-        return distance - 1;
+        return distance;
     }
 
     public static long shuffle(long state)
diff --git a/harry-core/src/harry/model/DoNothingModel.java b/harry-core/src/harry/model/DoNothingModel.java
index c21392d..35b5f86 100644
--- a/harry-core/src/harry/model/DoNothingModel.java
+++ b/harry-core/src/harry/model/DoNothingModel.java
@@ -23,11 +23,7 @@ import harry.runner.Query;
 
 public class DoNothingModel implements Model
 {
-    public void recordEvent(long lts, boolean quorumAchieved)
-    {
-    }
-
-    public void validatePartitionState(long verificationLts, Query query)
+    public void validate(Query query)
     {
     }
 
diff --git a/harry-core/src/harry/model/ExhaustiveChecker.java b/harry-core/src/harry/model/ExhaustiveChecker.java
index 935da45..b98ef06 100644
--- a/harry-core/src/harry/model/ExhaustiveChecker.java
+++ b/harry-core/src/harry/model/ExhaustiveChecker.java
@@ -30,20 +30,21 @@ import java.util.TreeMap;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.runner.AbstractPartitionVisitor;
+import harry.runner.DataTracker;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
 
@@ -61,42 +62,26 @@ public class ExhaustiveChecker implements Model
     protected final OpSelectors.PdSelector pdSelector;
     protected final OpSelectors.MonotonicClock clock;
     protected final SystemUnderTest sut;
-    protected final QuerySelector querySelector;
+    protected final QueryGenerator rangeSelector;
 
     protected final DataTracker tracker;
 
     private final SchemaSpec schema;
 
-    public ExhaustiveChecker(SchemaSpec schema,
-                             OpSelectors.PdSelector pdSelector,
-                             OpSelectors.DescriptorSelector descriptorSelector,
-                             OpSelectors.MonotonicClock clock,
-                             QuerySelector querySelector,
-                             SystemUnderTest sut)
+    public ExhaustiveChecker(Run run)
     {
-        this.descriptorSelector = descriptorSelector;
-        this.pdSelector = pdSelector;
-        this.tracker = new DataTracker();
-        this.schema = schema;
-        this.sut = sut;
-        this.clock = clock;
-        this.querySelector = querySelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.pdSelector = run.pdSelector;
+        this.tracker = run.tracker;
+        this.schema = run.schemaSpec;
+        this.sut = run.sut;
+        this.clock = run.clock;
+        this.rangeSelector = run.rangeSelector;
     }
 
-    public void recordEvent(long lts, boolean quorumAchieved)
+    public void validate(Query query)
     {
-        tracker.recordEvent(lts, quorumAchieved);
-    }
-
-    public DataTracker tracker()
-    {
-        return tracker;
-    }
-
-    public void validatePartitionState(long validationLts, Query query)
-    {
-        validatePartitionState(validationLts,
-                               query,
+        validatePartitionState(query,
                                () -> {
                                    while (!Thread.currentThread().isInterrupted())
                                    {
@@ -114,18 +99,18 @@ public class ExhaustiveChecker implements Model
                                });
     }
 
-    void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+    void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
     {
         // Before we execute SELECT, we know what was the lts of operation that is guaranteed to be visible
-        long visibleLtsBound = tracker.maxCompleteLts();
+        long visibleLtsBound = tracker.maxConsecutiveFinished();
 
         // TODO: when we implement a reorder buffer through a bitmap, we can just grab a bitmap _before_,
         //       and know that a combination of `consecutive` + `bitmap` gives us _all possible guaranteed-to-be-seen_ values
         List<ResultSetRow> rows = rowsSupplier.get();
 
         // by the time SELECT done, we grab max "possible" lts
-        long inFlightLtsBound = tracker.maxSeenLts();
-        PartitionState partitionState = inflatePartitionState(validationLts, inFlightLtsBound, query);
+        long maxSeenLts = tracker.maxStarted();
+        PartitionState partitionState = inflatePartitionState(maxSeenLts, query);
         NavigableMap<Long, List<Operation>> operations = partitionState.operations;
         LongComparator cmp = FORWARD_COMPARATOR;
         if (query.reverse)
@@ -137,7 +122,7 @@ public class ExhaustiveChecker implements Model
         if (!rows.isEmpty() && operations.isEmpty())
         {
             throw new ValidationException(String.format("Returned rows are not empty, but there were no records in the event log.\nRows: %s\nMax seen LTS: %s\nQuery: %s",
-                                                        rows, inFlightLtsBound, query));
+                                                        rows, maxSeenLts, query));
         }
 
         PeekingIterator<ResultSetRow> rowIter = Iterators.peekingIterator(rows.iterator());
@@ -156,7 +141,7 @@ public class ExhaustiveChecker implements Model
                 if (rowIter.hasNext() && rowIter.peek().cd == cd)
                 {
                     ResultSetRow row = rowIter.next();
-                    RowValidationState validationState = new RowValidationState(row, visibleLtsBound, inFlightLtsBound, partitionState.rangeTombstones);
+                    RowValidationState validationState = new RowValidationState(row, visibleLtsBound, maxSeenLts, partitionState.rangeTombstones);
 
                     // TODO: We only need to go for as long as we explain every column. In fact, we make state _less_ precise by allowing
                     // to continue moving back in time. So far this hasn't proven to be a source of any issues, but we should fix that.
@@ -230,14 +215,11 @@ public class ExhaustiveChecker implements Model
         catch (Throwable t)
         {
             throw new ValidationException(String.format("Caught exception while validating the resultset %s." +
-                                                        "\nchecker.tracker().forceLts(%dL, %dL)" +
-                                                        "\nrun.validator.validatePartition(%dL)" +
                                                         "\nRow Iter Peek: %s" +
                                                         "\nValidated no rows:\n%s" +
                                                         "\nValidated rows:\n%s" +
                                                         "\nRows:\n%s",
                                                         query,
-                                                        inFlightLtsBound, visibleLtsBound, validationLts,
                                                         rowIter.hasNext() ? rowIter.peek() : null,
                                                         validatedNoRows,
                                                         validatedRows.stream().map(Object::toString).collect(Collectors.joining(",\n")),
@@ -320,14 +302,10 @@ public class ExhaustiveChecker implements Model
         }
     }
 
-    public PartitionState inflatePartitionState(long validationLts, long maxLts, Query query)
+    public PartitionState inflatePartitionState(long maxLts, Query query)
     {
-        long currentLts = pdSelector.maxLts(validationLts);
-        long pd = pdSelector.pd(currentLts, schema);
+        long currentLts = pdSelector.maxLtsFor(query.pd);
 
-        if (pd != pdSelector.pd(validationLts, schema))
-            throw new ValidationException("Partition descriptor %d doesn't match partition descriptor %d for LTS %d",
-                                          pd, pdSelector.pd(validationLts, schema), validationLts);
         NavigableMap<Long, List<Operation>> operations = new TreeMap<>();
         List<Ranges.Range> ranges = new ArrayList<>();
 
@@ -338,7 +316,11 @@ public class ExhaustiveChecker implements Model
                 OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
                 if (opType == OpSelectors.OperationKind.DELETE_RANGE)
                 {
-                    ranges.add(maybeWrap(lts, opId, querySelector.inflate(lts, opId).toRange(lts)));
+                    ranges.add(maybeWrap(lts, opId, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toRange(lts)));
+                }
+                else if (opType == OpSelectors.OperationKind.DELETE_SLICE)
+                {
+                    ranges.add(maybeWrap(lts, opId, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toRange(lts)));
                 }
                 else if (query.match(cd)) // skip descriptors that are out of range
                 {
@@ -389,11 +371,6 @@ public class ExhaustiveChecker implements Model
         }
     }
 
-    public Configuration.ExhaustiveCheckerConfig toConfig()
-    {
-        return new Configuration.ExhaustiveCheckerConfig(tracker.maxSeenLts(), tracker.maxCompleteLts());
-    }
-
     public String toString()
     {
         return "ExhaustiveChecker{" + tracker.toString() + '}';
@@ -557,12 +534,6 @@ public class ExhaustiveChecker implements Model
         }
     }
 
-    @VisibleForTesting
-    public void forceLts(long maxSeen, long maxComplete)
-    {
-        tracker.forceLts(maxSeen, maxComplete);
-    }
-
     public static class Operation implements Comparable<Operation>
     {
         public final long pd;
diff --git a/harry-core/src/harry/model/Model.java b/harry-core/src/harry/model/Model.java
index 04fe6f1..3168c8a 100644
--- a/harry-core/src/harry/model/Model.java
+++ b/harry-core/src/harry/model/Model.java
@@ -18,39 +18,22 @@
 
 package harry.model;
 
-import harry.core.Configuration;
-import harry.ddl.SchemaSpec;
-import harry.model.sut.SystemUnderTest;
+import harry.core.Run;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
 
 public interface Model
 {
     long NO_TIMESTAMP = Long.MIN_VALUE;
 
-    void recordEvent(long lts, boolean quorumAchieved);
-
-    void validatePartitionState(long verificationLts, Query query);
-
-    Configuration.ModelConfiguration toConfig();
+    void validate(Query query);
 
     interface ModelFactory
     {
-        Model create(SchemaSpec schema,
-                     OpSelectors.PdSelector pdSelector,
-                     OpSelectors.DescriptorSelector descriptorSelector,
-                     OpSelectors.MonotonicClock clock,
-                     QuerySelector querySelector,
-                     SystemUnderTest sut);
+        Model make(Run run);
     }
 
     class ValidationException extends RuntimeException
     {
-        public ValidationException()
-        {
-            super();
-        }
-
         public ValidationException(String message)
         {
             super(message);
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index ad8e046..7d7ad6d 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -117,8 +117,7 @@ public interface OpSelectors
 
         public abstract long prevLts(long lts);
 
-        public abstract long maxLts(long lts);
-
+        public abstract long maxLtsFor(long pd);
         public abstract long minLtsAt(long position);
 
         public abstract long minLtsFor(long pd);
@@ -281,8 +280,8 @@ public interface OpSelectors
 
         public long minLtsFor(long pd)
         {
-            long sequenceNumber = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
-            return minLtsAt(sequenceNumber);
+            long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+            return minLtsAt(position);
         }
 
         public long positionFor(long lts)
@@ -291,6 +290,11 @@ public interface OpSelectors
             return windowStart + lts % windowSize;
         }
 
+        public long positionForPd(long pd)
+        {
+            return rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+        }
+
         public long nextLts(long lts)
         {
             long slideCount = lts / switchAfter;
@@ -326,11 +330,9 @@ public interface OpSelectors
             return slideCount * switchAfter - windowSize + positionInCycle;
         }
 
-        public long maxLts(long lts)
+        public long maxLtsFor(long pd)
         {
-            long windowStart = lts / switchAfter;
-            long position = windowStart + lts % windowSize;
-
+            long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
             return position * switchAfter + (slideAfterRepeats - 1) * windowSize;
         }
 
@@ -607,8 +609,7 @@ public interface OpSelectors
 
         public long vd(long pd, long cd, long lts, long opId, int col)
         {
-            // change randomNumber / sequenceNumber to prev/Next
-            return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
+            return rng.randomNumber(opId, pd ^ cd ^ lts ^ col);
         }
 
         public long modificationId(long pd, long cd, long lts, long vd, int col)
@@ -622,6 +623,7 @@ public interface OpSelectors
         WRITE,
         DELETE_ROW,
         DELETE_COLUMN,
-        DELETE_RANGE
+        DELETE_RANGE,
+        DELETE_SLICE
     }
 }
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index f7b38a3..ffd50b3 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -22,54 +22,49 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.function.Supplier;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.reconciler.Reconciler;
+import harry.runner.DataTracker;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 public class QuiescentChecker implements Model
 {
-    private final OpSelectors.MonotonicClock clock;
-
-    private final DataTracker tracker;
-    private final SystemUnderTest sut;
-    private final Reconciler reconciler;
+    protected final OpSelectors.MonotonicClock clock;
 
-    public QuiescentChecker(SchemaSpec schema,
-                            OpSelectors.PdSelector pdSelector,
-                            OpSelectors.DescriptorSelector descriptorSelector,
-                            OpSelectors.MonotonicClock clock,
-                            QuerySelector querySelector,
+    protected final DataTracker tracker;
+    protected final SystemUnderTest sut;
+    protected final Reconciler reconciler;
 
-                            SystemUnderTest sut)
+    public QuiescentChecker(Run run)
     {
-        this.clock = clock;
-        this.sut = sut;
+        this.clock = run.clock;
+        this.sut = run.sut;
 
-        this.reconciler = new Reconciler(schema, pdSelector, descriptorSelector, querySelector);
-        this.tracker = new DataTracker();
+        this.reconciler = new Reconciler(run.schemaSpec, run.pdSelector, run.descriptorSelector, run.rangeSelector);
+        this.tracker = run.tracker;
     }
 
-    public void recordEvent(long lts, boolean quorumAchieved)
+    public void validate(Query query)
     {
-        tracker.recordEvent(lts, quorumAchieved);
+        validate(() -> SelectHelper.execute(sut, clock, query), query);
     }
 
-    public void validatePartitionState(long verificationLts, Query query)
+    protected void validate(Supplier<List<ResultSetRow>> rowsSupplier, Query query)
     {
-        long maxCompeteLts = tracker.maxCompleteLts();
-        long maxSeenLts = tracker.maxSeenLts();
+        long maxCompeteLts = tracker.maxConsecutiveFinished();
+        long maxSeenLts = tracker.maxStarted();
 
         assert maxCompeteLts == maxSeenLts : "Runner hasn't settled down yet. " +
                                              "Quiescent model can't be reliably used in such cases.";
 
-        List<ResultSetRow> actualRows = SelectHelper.execute(sut, clock, query);
+        List<ResultSetRow> actualRows = rowsSupplier.get();
         Iterator<ResultSetRow> actual = actualRows.iterator();
         Collection<Reconciler.RowState> expectedRows = reconciler.inflatePartitionState(query.pd, maxSeenLts, query).rows(query.reverse);
         Iterator<Reconciler.RowState> expected = expectedRows.iterator();
@@ -104,15 +99,4 @@ public class QuiescentChecker implements Model
                                           actualRows);
         }
     }
-
-    @VisibleForTesting
-    public void forceLts(long maxSeen, long maxComplete)
-    {
-        tracker.forceLts(maxSeen, maxComplete);
-    }
-
-    public Configuration.ModelConfiguration toConfig()
-    {
-        return new Configuration.QuiescentCheckerConfig(tracker.maxSeenLts(), tracker.maxCompleteLts());
-    }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/model/SelectHelper.java b/harry-core/src/harry/model/SelectHelper.java
index 8cf9d80..2b50339 100644
--- a/harry-core/src/harry/model/SelectHelper.java
+++ b/harry-core/src/harry/model/SelectHelper.java
@@ -117,7 +117,7 @@ public class SelectHelper
     public static List<ResultSetRow> execute(SystemUnderTest sut, OpSelectors.MonotonicClock clock, Query query)
     {
         CompiledStatement compiled = query.toSelectStatement();
-        Object[][] objects = sut.execute(compiled.cql(), compiled.bindings());
+        Object[][] objects = sut.execute(compiled.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, compiled.bindings());
         List<ResultSetRow> result = new ArrayList<>();
         for (Object[] obj : objects)
             result.add(resultSetToRow(query.schemaSpec, clock, obj));
diff --git a/harry-core/src/harry/model/StatelessVisibleRowsChecker.java b/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
index 01f0381..8573b3b 100644
--- a/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
+++ b/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
@@ -22,11 +22,12 @@ import java.util.List;
 import java.util.function.Supplier;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 import static harry.model.VisibleRowsChecker.descendingIterator;
 
@@ -43,29 +44,18 @@ public class StatelessVisibleRowsChecker implements Model
 
     protected final SchemaSpec schema;
 
-    public StatelessVisibleRowsChecker(SchemaSpec schema,
-                                       OpSelectors.PdSelector pdSelector,
-                                       OpSelectors.DescriptorSelector descriptorSelector,
-                                       OpSelectors.MonotonicClock clock,
-                                       QuerySelector querySelector,
-                                       SystemUnderTest sut)
+    public StatelessVisibleRowsChecker(Run run)
     {
-        this.pdSelector = pdSelector;
-        this.descriptorSelector = descriptorSelector;
-        this.schema = schema;
-        this.clock = clock;
-        this.sut = sut;
+        this.pdSelector = run.pdSelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.sut = run.sut;
     }
 
-    public void recordEvent(long lts, boolean quorumAchieved)
+    public void validate(Query query)
     {
-        //no-op
-    }
-
-    public void validatePartitionState(long validationLts, Query query)
-    {
-        validatePartitionState(validationLts,
-                               query,
+        validatePartitionState(query,
                                () -> SelectHelper.execute(sut, clock, query));
     }
 
@@ -74,17 +64,17 @@ public class StatelessVisibleRowsChecker implements Model
         throw new RuntimeException("not implemented");
     }
 
-    void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+    void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
     {
         // we ignore Query here, since our criteria for checking in this model is presence of the row in the resultset
-        long pd = pdSelector.pd(validationLts, schema);
+        long pd = query.pd;
 
         List<ResultSetRow> rows = rowsSupplier.get();
 
         for (ResultSetRow row : rows)
         {
             VisibleRowsChecker.LongIterator rowLtsIter = descendingIterator(row.lts);
-            VisibleRowsChecker.LongIterator modelLtsIter = descendingIterator(pdSelector, validationLts);
+            VisibleRowsChecker.LongIterator modelLtsIter = descendingIterator(pdSelector, pd);
 
             outer:
             while (rowLtsIter.hasNext())
diff --git a/harry-core/src/harry/model/VisibleRowsChecker.java b/harry-core/src/harry/model/VisibleRowsChecker.java
index ddda8b8..902bf07 100644
--- a/harry-core/src/harry/model/VisibleRowsChecker.java
+++ b/harry-core/src/harry/model/VisibleRowsChecker.java
@@ -24,18 +24,18 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
+import harry.runner.DefaultDataTracker;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
 
 /**
  * A simple model to check whether or not the rows reported as visible by the database are reflected in
@@ -43,54 +43,86 @@ import harry.runner.QuerySelector;
  */
 public class VisibleRowsChecker implements Model
 {
-    protected final Map<Long, TreeMap<Long, Event>> eventLog;
     protected final OpSelectors.DescriptorSelector descriptorSelector;
     protected final OpSelectors.PdSelector pdSelector;
     protected final OpSelectors.MonotonicClock clock;
+    protected final LoggingDataTracker tracker;
     protected final SystemUnderTest sut;
-    protected final AtomicLong maxLts;
     protected final SchemaSpec schema;
 
-    public VisibleRowsChecker(SchemaSpec schema,
-                              OpSelectors.PdSelector pdSelector,
-                              OpSelectors.DescriptorSelector descriptorSelector,
-                              OpSelectors.MonotonicClock clock,
-                              QuerySelector querySelector,
-                              SystemUnderTest sut)
+    public VisibleRowsChecker(Run run)
     {
-        this.pdSelector = pdSelector;
-        this.descriptorSelector = descriptorSelector;
-        this.eventLog = new HashMap<>();
-        this.maxLts = new AtomicLong();
-        this.schema = schema;
-        this.clock = clock;
-        this.sut = sut;
+        assert run.tracker instanceof LoggingDataTracker : "Visible rows checker requires a logging data tracker to run";
+        this.tracker = (LoggingDataTracker) run.tracker;
+        this.tracker.pdSelector = run.pdSelector;
+        this.pdSelector = run.pdSelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.sut = run.sut;
     }
 
-
-    public synchronized void recordEvent(long lts, boolean quorumAchieved)
+    public static class LoggingDataTracker extends DefaultDataTracker
     {
-        maxLts.updateAndGet((old) -> Math.max(old, lts));
-        long pd = pdSelector.pd(lts);
+        protected final Map<Long, TreeMap<Long, Event>> eventLog = new HashMap<>();
+        private OpSelectors.PdSelector pdSelector;
 
-        // TODO: This is definitely not optimal, but we probably use a better, potentially off-heap sorted structure for that anyways
-        TreeMap<Long, Event> events = eventLog.get(pd);
-        if (events == null)
+        public LoggingDataTracker()
         {
-            events = new TreeMap<>();
-            eventLog.put(pd, events);
         }
 
-        Event event = events.get(lts);
-        assert event == null || !event.quorumAchieved : "Operation should be partially visible before it is fully visible";
-        events.put(lts, new Event(lts, quorumAchieved));
-    }
+        public synchronized void started(long lts)
+        {
+            super.started(lts);
+            recordEvent(lts, false);
+        }
+
+        public synchronized void finished(long lts)
+        {
+            super.finished(lts);
+            recordEvent(lts, true);
+        }
+
+        public synchronized TreeMap<Long, Event> events(long pd)
+        {
+            TreeMap<Long, Event> log = eventLog.get(pd);
+            if (log == null)
+                return null;
+
+            return (TreeMap<Long, Event>) log.clone();
+        }
+
+        public long maxStarted()
+        {
+            return super.maxStarted();
+        }
 
+        public long maxConsecutiveFinished()
+        {
+            return super.maxConsecutiveFinished();
+        }
 
-    public void validatePartitionState(long validationLts, Query query)
+        public void recordEvent(long lts, boolean finished)
+        {
+            long pd = pdSelector.pd(lts);
+
+            // TODO: This is definitely not optimal, but we probably use a better, potentially off-heap sorted structure for that anyways
+            TreeMap<Long, Event> events = eventLog.get(pd);
+            if (events == null)
+            {
+                events = new TreeMap<>();
+                eventLog.put(pd, events);
+            }
+
+            Event event = events.get(lts);
+            assert event == null || !event.quorumAchieved : "Operation should be partially visible before it is fully visible";
+            events.put(lts, new Event(lts, finished));
+        }
+    }
+
+    public void validate(Query query)
     {
-        validatePartitionState(validationLts,
-                               query,
+        validatePartitionState(query,
                                () -> SelectHelper.execute(sut, clock, query));
     }
 
@@ -99,22 +131,23 @@ public class VisibleRowsChecker implements Model
         throw new RuntimeException("not implemented");
     }
 
-    synchronized void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+    synchronized void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
     {
         // TODO: Right now, we ignore Query here!
-        long pd = pdSelector.pd(validationLts, schema);
+        long pd = query.pd;
         List<ResultSetRow> rows = rowsSupplier.get();
-        TreeMap<Long, Event> events = eventLog.get(pd);
+        TreeMap<Long, Event> events = tracker.events(pd);
+
         if (!rows.isEmpty() && (events == null || events.isEmpty()))
         {
             throw new ValidationException(String.format("Returned rows are not empty, but there were no records in the event log.\nRows: %s\nSeen pds: %s",
-                                                        rows, eventLog.keySet()));
+                                                        rows, tracker.eventLog.keySet()));
         }
 
         for (ResultSetRow row : rows)
         {
             LongIterator rowLtsIter = descendingIterator(row.lts);
-            PeekingIterator<Event> modelLtsIter = Iterators.peekingIterator(events.subMap(0L, true, maxLts.get(), true)
+            PeekingIterator<Event> modelLtsIter = Iterators.peekingIterator(events.subMap(0L, true, tracker.maxStarted(), true)
                                                                                   .descendingMap()
                                                                                   .values()
                                                                                   .iterator());
@@ -158,11 +191,11 @@ public class VisibleRowsChecker implements Model
     }
 
 
-    public static LongIterator descendingIterator(OpSelectors.PdSelector pdSelector, long verificationLts)
+    public static LongIterator descendingIterator(OpSelectors.PdSelector pdSelector, long pd)
     {
         return new VisibleRowsChecker.LongIterator()
         {
-            long next = pdSelector.maxLts(verificationLts);
+            long next = pdSelector.maxLtsFor(pd);
 
             public long nextLong()
             {
diff --git a/harry-core/src/harry/model/sut/PrintlnSut.java b/harry-core/src/harry/model/sut/PrintlnSut.java
index 463d780..ad14b0b 100644
--- a/harry-core/src/harry/model/sut/PrintlnSut.java
+++ b/harry-core/src/harry/model/sut/PrintlnSut.java
@@ -32,17 +32,18 @@ public class PrintlnSut implements SystemUnderTest
     {
     }
 
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
     {
-        System.out.println(String.format("%s | %s",
+        System.out.println(String.format("%s | %s | %s",
                                          statement,
+                                         cl,
                                          Arrays.toString(bindings)));
         return new Object[0][];
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
-        return CompletableFuture.supplyAsync(() -> execute(statement, bindings),
+        return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings),
                                              Runnable::run);
     }
 }
diff --git a/harry-core/src/harry/model/sut/SystemUnderTest.java b/harry-core/src/harry/model/sut/SystemUnderTest.java
index 9e5e0a9..ec0a48e 100644
--- a/harry-core/src/harry/model/sut/SystemUnderTest.java
+++ b/harry-core/src/harry/model/sut/SystemUnderTest.java
@@ -35,20 +35,51 @@ public interface SystemUnderTest
 
     default void schemaChange(String statement)
     {
-        execute(statement, new Object[]{});
+        execute(statement, ConsistencyLevel.ALL, new Object[]{});
     }
 
-    default Object[][] execute(CompiledStatement statement)
+    default Object[][] execute(CompiledStatement statement, ConsistencyLevel cl)
     {
-        return execute(statement.cql(), statement.bindings());
+        return execute(statement.cql(), cl, statement.bindings());
     }
 
-    Object[][] execute(String statement, Object... bindings);
+    Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings);
 
-    CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings);
+    CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings);
 
     interface SystemUnderTestFactory
     {
         SystemUnderTest create();
     }
+
+    enum ConsistencyLevel {
+        ALL, QUORUM, NODE_LOCAL
+    }
+
+    public static final SystemUnderTest NO_OP = new NoOpSut();
+
+    public class NoOpSut implements SystemUnderTest
+    {
+        private NoOpSut() {}
+        public boolean isShutdown()
+        {
+            return false;
+        }
+
+        public void shutdown()
+        {
+        }
+
+        public Object[][] execute(String statement, ConsistencyLevel cl,  Object... bindings)
+        {
+            return new Object[0][];
+        }
+
+        public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
+        {
+            return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings),
+                                                 Runnable::run);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index 50b03c6..cab8fb8 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -24,14 +24,16 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 
 import harry.ddl.SchemaSpec;
+import harry.model.ExhaustiveChecker;
 import harry.model.OpSelectors;
 import harry.runner.AbstractPartitionVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
 
@@ -49,36 +51,42 @@ public class Reconciler
 {
     private final OpSelectors.DescriptorSelector descriptorSelector;
     private final OpSelectors.PdSelector pdSelector;
-    private final QuerySelector querySelector;
+    private final QueryGenerator rangeSelector;
     private final SchemaSpec schema;
 
     public Reconciler(SchemaSpec schema,
                       OpSelectors.PdSelector pdSelector,
                       OpSelectors.DescriptorSelector descriptorSelector,
-                      QuerySelector querySelector)
+                      QueryGenerator rangeSelector)
     {
         this.descriptorSelector = descriptorSelector;
         this.pdSelector = pdSelector;
         this.schema = schema;
-        this.querySelector = querySelector;
+        this.rangeSelector = rangeSelector;
     }
 
     public PartitionState inflatePartitionState(final long pd, long maxLts, Query query)
     {
         List<Ranges.Range> ranges = new ArrayList<>();
+
         // TODO: we should think of a single-pass algorithm that would allow us to inflate all deletes and range deletes for a partition
         PartitionVisitor partitionVisitor = new AbstractPartitionVisitor(pdSelector, descriptorSelector, schema)
         {
             public void operation(long lts, long pd, long cd, long m, long opId)
             {
-                if (!query.match(cd))
-                    return;
-
                 OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
                 if (opType == OpSelectors.OperationKind.DELETE_RANGE)
-                    ranges.add(querySelector.inflate(lts, opId).toRange(lts));
+                {
+                    ranges.add(rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toRange(lts));
+                }
+                else if (opType == OpSelectors.OperationKind.DELETE_SLICE)
+                {
+                    ranges.add(rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toRange(lts));
+                }
                 else if (opType == OpSelectors.OperationKind.DELETE_ROW)
+                {
                     ranges.add(new Ranges.Range(cd, cd, true, true, lts));
+                }
             }
         };
 
@@ -90,21 +98,27 @@ public class Reconciler
             currentLts = pdSelector.nextLts(currentLts);
         }
 
-        // We have to do two passes to avoid inflating deleted items
-        Ranges rts = new Ranges(ranges);
-
         PartitionState partitionState = new PartitionState();
         partitionVisitor = new AbstractPartitionVisitor(pdSelector, descriptorSelector, schema)
         {
             public void operation(long lts, long pd, long cd, long m, long opId)
             {
+                if (!query.match(cd))
+                    return;
+
                 OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
 
-                if (opType == OpSelectors.OperationKind.DELETE_ROW || opType == OpSelectors.OperationKind.DELETE_RANGE)
+                if (opType == OpSelectors.OperationKind.DELETE_ROW
+                    || opType == OpSelectors.OperationKind.DELETE_RANGE
+                    || opType == OpSelectors.OperationKind.DELETE_SLICE)
                     return;
 
-                if (!query.match(cd) || rts.isShadowed(cd, lts))
-                    return;
+                // TODO: avoid linear scan
+                for (Ranges.Range range : ranges)
+                {
+                    if (range.timestamp >= lts && range.contains(cd))
+                        return;
+                }
 
                 if (opType == OpSelectors.OperationKind.WRITE)
                 {
@@ -167,7 +181,6 @@ public class Reconciler
                     }
                 }
 
-
                 state = new RowState(cd, vdsCopy, ltss);
                 rows.put(cd, state);
             }
diff --git a/harry-core/src/harry/runner/AbstractPartitionVisitor.java b/harry-core/src/harry/runner/AbstractPartitionVisitor.java
index aefa79f..a3ebd33 100644
--- a/harry-core/src/harry/runner/AbstractPartitionVisitor.java
+++ b/harry-core/src/harry/runner/AbstractPartitionVisitor.java
@@ -91,13 +91,14 @@ public abstract class AbstractPartitionVisitor implements PartitionVisitor
 
     protected void operation(long lts, long pd, long cd, long m, long opId)
     {
+
     }
 
     protected void afterBatch(long lts, long pd, long m)
     {
     }
 
-    public void shutdown()
+    public void shutdown() throws InterruptedException
     {
     }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/AllPartitionsValidator.java b/harry-core/src/harry/runner/AllPartitionsValidator.java
new file mode 100644
index 0000000..674ab5e
--- /dev/null
+++ b/harry-core/src/harry/runner/AllPartitionsValidator.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.MetricReporter;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.Model;
+import harry.model.OpSelectors;
+
+// This might be something that potentially grows into the validator described in the design doc;
+// right now it's just a helper/container class
+public class AllPartitionsValidator implements PartitionVisitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(AllPartitionsValidator.class);
+
+    protected final Model model;
+    protected final SchemaSpec schema;
+
+    protected final OpSelectors.MonotonicClock clock;
+    protected final OpSelectors.PdSelector pdSelector;
+    protected final MetricReporter metricReporter;
+    protected final ExecutorService executor;
+    protected final int concurrency;
+    protected final int triggerAfter;
+
+    public AllPartitionsValidator(int concurrency,
+                                  int triggerAfter,
+                                  Run run,
+                                  Model.ModelFactory modelFactory)
+    {
+        this.triggerAfter = triggerAfter;
+        this.metricReporter = run.metricReporter;
+        this.model = modelFactory.make(run);
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.pdSelector = run.pdSelector;
+        this.concurrency = concurrency;
+        this.executor = Executors.newFixedThreadPool(concurrency);
+    }
+
+    protected CompletableFuture<Void> validateAllPartitions(ExecutorService executor, int parallelism)
+    {
+        long maxLts = clock.maxLts() - 1;
+        long maxPos = pdSelector.positionFor(maxLts);
+        AtomicLong counter = new AtomicLong();
+        CompletableFuture[] futures = new CompletableFuture[parallelism];
+        for (int i = 0; i < parallelism; i++)
+        {
+            futures[i] = CompletableFuture.supplyAsync(() -> {
+                long pos;
+                while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted())
+                {
+                    if (pos > 0 && pos % 1000 == 0)
+                        logger.debug(String.format("Validated %d out of %d partitions", pos, maxPos));
+                    long visitLts = pdSelector.minLtsAt(pos);
+
+                    metricReporter.validatePartition();
+                    for (boolean reverse : new boolean[]{ true, false })
+                    {
+                        model.validate(Query.selectPartition(schema, pdSelector.pd(visitLts, schema), reverse));
+                    }
+                }
+                return null;
+            }, executor);
+        }
+        return CompletableFuture.allOf(futures);
+    }
+
+    public void visitPartition(long lts)
+    {
+        if (lts % triggerAfter == 0)
+            validateAllPartitions(executor, concurrency);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+}
diff --git a/harry-integration/src/harry/runner/HarryRunnerJvm.java b/harry-core/src/harry/runner/DataTracker.java
similarity index 55%
rename from harry-integration/src/harry/runner/HarryRunnerJvm.java
rename to harry-core/src/harry/runner/DataTracker.java
index 38bf264..d5825fb 100644
--- a/harry-integration/src/harry/runner/HarryRunnerJvm.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -19,21 +19,34 @@
 package harry.runner;
 
 import harry.core.Configuration;
-import harry.model.sut.InJvmSut;
 
-import java.io.File;
+public interface DataTracker
+{
+    void started(long lts);
+    void finished(long lts);
 
-public class HarryRunnerJvm extends org.apache.cassandra.distributed.test.TestBaseImpl implements HarryRunner {
+    long maxStarted();
+    long maxConsecutiveFinished();
 
-    public static void main(String[] args) throws Throwable {
-        InJvmSut.registerSubtypes();
+    public Configuration.DataTrackerConfiguration toConfig();
 
-        HarryRunnerJvm runner = new HarryRunnerJvm();
-        File configFile = runner.loadConfig(args);
-
-        Configuration configuration = Configuration.fromFile(configFile);
-        runner.run(configuration);
+    interface DataTrackerFactory {
+        DataTracker make();
     }
 
+    public static DataTracker NO_OP = new NoOpDataTracker();
+    class NoOpDataTracker implements DataTracker
+    {
+        private NoOpDataTracker() {}
+
+        public void started(long lts) {}
+        public void finished(long lts) {}
+        public long maxStarted() { return 0; }
+        public long maxConsecutiveFinished() { return 0; }
 
+        public Configuration.DataTrackerConfiguration toConfig()
+        {
+            return null;
+        }
+    }
 }
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
new file mode 100644
index 0000000..1b55482
--- /dev/null
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Configuration;
+
+public class DefaultDataTracker implements DataTracker
+{
+    private static final Logger logger = LoggerFactory.getLogger(DefaultDataTracker.class);
+
+    private final AtomicLong maxSeenLts;
+    // TODO: This is a trivial implementation that can be significantly improved upon
+    // for example, we could use a bitmap that records `1`s for all lts that are after
+    // the consective, and "collapse" the bitmap state into the long as soon as we see
+    // consecutive `1` on the left side.
+    private final AtomicLong maxCompleteLts;
+    private final PriorityBlockingQueue<Long> reorderBuffer;
+
+    public DefaultDataTracker()
+    {
+        this.maxSeenLts = new AtomicLong(-1);
+        this.maxCompleteLts = new AtomicLong(-1);
+        this.reorderBuffer = new PriorityBlockingQueue<>(100);
+    }
+
+    // TODO: there's also some room for improvement in terms of concurrency
+    // TODO: remove pd?
+    public void started(long lts)
+    {
+        recordEvent(lts, false);
+    }
+
+    public void finished(long lts)
+    {
+        recordEvent(lts, true);
+    }
+
+    private void recordEvent(long lts, boolean finished)
+    {
+        // all seen LTS are allowed to be "in-flight"
+        maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
+
+        if (!finished)
+            return;
+
+        long maxAchievedConsecutive = drainReorderQueue();
+
+        if (maxAchievedConsecutive + 1 == lts)
+            maxCompleteLts.compareAndSet(maxAchievedConsecutive, lts);
+        else
+            reorderBuffer.offer(lts);
+    }
+
+    public long drainReorderQueue()
+    {
+        final long expected = maxCompleteLts.get();
+        long maxAchievedConsecutive = expected;
+        if (reorderBuffer.isEmpty())
+            return maxAchievedConsecutive;
+
+        boolean catchingUp = false;
+
+        Long smallest = reorderBuffer.poll();
+        while (smallest != null && smallest == maxAchievedConsecutive + 1)
+        {
+            maxAchievedConsecutive++;
+            catchingUp = true;
+            smallest = reorderBuffer.poll();
+        }
+
+        // put back
+        if (smallest != null)
+            reorderBuffer.offer(smallest);
+
+        if (catchingUp)
+            maxCompleteLts.compareAndSet(expected, maxAchievedConsecutive);
+
+        int bufferSize = reorderBuffer.size();
+        if (bufferSize > 100)
+            logger.warn("Reorder buffer size has grown up to " + reorderBuffer.size());
+        return maxAchievedConsecutive;
+    }
+
+    public long maxStarted()
+    {
+        return maxSeenLts.get();
+    }
+
+    public long maxConsecutiveFinished()
+    {
+        return maxCompleteLts.get();
+    }
+
+    public Configuration.DataTrackerConfiguration toConfig()
+    {
+        return new Configuration.DefaultDataTrackerConfiguration(maxSeenLts.get(), maxCompleteLts.get());
+    }
+
+    @VisibleForTesting
+    public void forceLts(long maxSeen, long maxComplete)
+    {
+        this.maxSeenLts.set(maxSeen);
+        this.maxCompleteLts.set(maxComplete);
+    }
+
+    public String toString()
+    {
+        List<Long> buf = new ArrayList<>(reorderBuffer);
+        return "DataTracker{" +
+               "maxSeenLts=" + maxSeenLts +
+               ", maxCompleteLts=" + maxCompleteLts +
+               ", reorderBuffer=" + buf +
+               '}';
+    }
+}
diff --git a/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java b/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java
deleted file mode 100644
index 89873fd..0000000
--- a/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.runner;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.ddl.SchemaSpec;
-import harry.model.Model;
-import harry.model.OpSelectors;
-import harry.model.sut.SystemUnderTest;
-import harry.operations.CompiledStatement;
-
-public class DefaultPartitionVisitorFactory implements Supplier<PartitionVisitor>
-{
-    private static final Logger logger = LoggerFactory.getLogger(DefaultPartitionVisitorFactory.class);
-
-    private final Model model;
-    private final SystemUnderTest sut;
-    private final RowVisitor rowVisitor;
-    private final BufferedWriter operationLog;
-
-    // TODO: shutdown properly
-    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
-    private final Supplier<DefaultPartitionVisitor> factory;
-
-    public DefaultPartitionVisitorFactory(Model model,
-                                          SystemUnderTest sut,
-                                          OpSelectors.PdSelector pdSelector,
-                                          OpSelectors.DescriptorSelector descriptorSelector,
-                                          SchemaSpec schema,
-                                          RowVisitor rowVisitor)
-    {
-        this.model = model;
-        this.sut = sut;
-        this.rowVisitor = rowVisitor;
-        this.factory = () -> new DefaultPartitionVisitor(pdSelector, descriptorSelector, schema);
-
-        File f = new File("operation.log");
-        try
-        {
-            operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public PartitionVisitor get()
-    {
-        return factory.get();
-    }
-
-    private class DefaultPartitionVisitor extends AbstractPartitionVisitor
-    {
-        private List<String> statements = new ArrayList<>();
-        private List<Object> bindings = new ArrayList<>();
-        private List<CompletableFuture> futures = new ArrayList<>();
-
-        public DefaultPartitionVisitor(OpSelectors.PdSelector pdSelector,
-                                       OpSelectors.DescriptorSelector descriptorSelector,
-                                       SchemaSpec schema)
-        {
-            super(pdSelector, descriptorSelector, schema);
-        }
-
-        public void beforeLts(long lts, long pd)
-        {
-            model.recordEvent(lts, false);
-        }
-
-        public void afterLts(long lts, long pd)
-        {
-            for (CompletableFuture future : futures)
-            {
-                try
-                {
-                    // TODO: currently, Cassandra keeps timing out; we definitely need to investigate that, but we need to focus on other things first
-                    future.get();
-                }
-                catch (Throwable t)
-                {
-                    throw new Model.ValidationException("Couldn't repeat operations within timeout bounds.", t);
-                }
-            }
-
-            log("LTS: %d. Pd %d. Finished\n", lts, pd);
-            model.recordEvent(lts, true);
-        }
-
-        public void beforeBatch(long lts, long pd, long m)
-        {
-            statements = new ArrayList<>();
-            bindings = new ArrayList<>();
-        }
-
-        public void operation(long lts, long pd, long cd, long m, long opId)
-        {
-            OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
-            CompiledStatement statement = rowVisitor.visitRow(op, lts, pd, cd, opId);
-
-            log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
-                              lts, pd, cd, m, opId, statement));
-
-            statements.add(statement.cql());
-            bindings.addAll(Arrays.asList(statement.bindings()));
-        }
-
-        public void afterBatch(long lts, long pd, long m)
-        {
-            String query = String.join(" ", statements);
-
-            if (statements.size() > 1)
-                query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
-
-            Object[] bindingsArray = new Object[bindings.size()];
-            bindings.toArray(bindingsArray);
-
-            CompletableFuture<Object[][]> future = new CompletableFuture<>();
-            executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
-            futures.add(future);
-
-            statements = null;
-            bindings = null;
-        }
-    }
-
-    void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
-    {
-        if (sut.isShutdown())
-            throw new IllegalStateException("System under test is shut down");
-
-        sut.executeAsync(statement.cql(), statement.bindings())
-           .whenComplete((res, t) -> {
-               if (t != null)
-                   executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
-               else
-                   future.complete(res);
-           });
-    }
-
-    private void log(String format, Object... objects)
-    {
-        try
-        {
-            operationLog.write(String.format(format, objects));
-        }
-        catch (IOException e)
-        {
-            // ignore
-        }
-    }
-
-    public void shutdown()
-    {
-        executor.shutdown();
-        try
-        {
-            executor.awaitTermination(30, TimeUnit.SECONDS);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/HarryRunner.java b/harry-core/src/harry/runner/HarryRunner.java
index 56e2350..77dcc8b 100644
--- a/harry-core/src/harry/runner/HarryRunner.java
+++ b/harry-core/src/harry/runner/HarryRunner.java
@@ -33,24 +33,40 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-public interface HarryRunner
+public abstract class HarryRunner
 {
+    public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
 
-    Logger logger = LoggerFactory.getLogger(HarryRunner.class);
+    protected CompletableFuture progress;
+    protected ScheduledThreadPoolExecutor executor;
+    public abstract void beforeRun(Runner runner);
+    public void afterRun(Runner runner, Object result)
+    {
+        executor.shutdown();
+        try
+        {
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            // ignore
+        }
+    }
 
-    default void run(Configuration configuration) throws Throwable
+    public void run(Configuration config) throws Throwable
     {
         System.setProperty("cassandra.disable_tcactive_openssl", "true");
         System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
         System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
 
-        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        executor = new ScheduledThreadPoolExecutor(1);
         executor.setRemoveOnCancelPolicy(true);
 
-        Runner runner = configuration.createRunner();
+        Runner runner = config.createRunner();
         Run run = runner.getRun();
 
-        CompletableFuture progress = runner.initAndStartAll();
+        progress = runner.initAndStartAll();
+        beforeRun(runner);
 
         // Uncomment this if you want to have fun!
         // scheduleCorruption(run, executor);
@@ -63,7 +79,7 @@ public interface HarryRunner
                 if (b != null)
                     return b;
                 return a;
-            }).get(run.snapshot.run_time_unit.toSeconds(run.snapshot.run_time) + 30,
+            }).get(config.run_time_unit.toSeconds(config.run_time) + 30,
                    TimeUnit.SECONDS);
             if (result instanceof Throwable)
                 logger.error("Execution failed", result);
@@ -77,6 +93,8 @@ public interface HarryRunner
         }
         finally
         {
+            afterRun(runner, result);
+
             logger.info("Shutting down executor..");
             tryRun(() -> {
                 executor.shutdownNow();
@@ -99,7 +117,7 @@ public interface HarryRunner
         }
     }
 
-    default void tryRun(ThrowingRunnable runnable)
+    public void tryRun(ThrowingRunnable runnable)
     {
         try
         {
@@ -117,7 +135,7 @@ public interface HarryRunner
      * @return Configuration YAML file.
      * @throws Exception If file is not found or cannot be read.
      */
-    default File loadConfig(String[] args) throws Exception {
+    public File loadConfig(String[] args) throws Exception {
         if (args == null || args.length == 0) {
             throw new Exception("Harry config YAML not provided.");
         }
diff --git a/harry-core/src/harry/runner/LoggingPartitionVisitor.java b/harry-core/src/harry/runner/LoggingPartitionVisitor.java
new file mode 100644
index 0000000..c3a77af
--- /dev/null
+++ b/harry-core/src/harry/runner/LoggingPartitionVisitor.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import harry.core.Run;
+import harry.operations.CompiledStatement;
+
+public class LoggingPartitionVisitor extends MutatingPartitionVisitor
+{
+    private final BufferedWriter operationLog;
+
+    public LoggingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run, rowVisitorFactory);
+
+        File f = new File("operation.log");
+        try
+        {
+            operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void afterLts(long lts, long pd)
+    {
+        super.afterLts(lts, pd);
+        log("LTS: %d. Pd %d. Finished\n", lts, pd);
+    }
+
+    @Override
+    protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
+    {
+        CompiledStatement statement = super.operationInternal(lts, pd, cd, m, opId);
+
+        log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
+                          lts, pd, cd, m, opId, statement));
+
+        return statement;
+    }
+
+    public static String bindingsToString(Object... bindings)
+    {
+        StringBuilder sb = new StringBuilder();
+        boolean isFirst = true;
+        for (Object binding : bindings)
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                sb.append(",");
+
+            if (binding instanceof String)
+                sb.append("\"").append(binding).append("\"");
+            else if (binding instanceof Long)
+                sb.append(binding).append("L");
+            else
+                sb.append(binding);
+        }
+        return sb.toString();
+    }
+
+    private void log(String format, Object... objects)
+    {
+        try
+        {
+            operationLog.write(String.format(format, objects));
+            operationLog.flush();
+        }
+        catch (IOException e)
+        {
+            // ignore
+        }
+    }
+}
diff --git a/harry-core/src/harry/runner/MutatingPartitionVisitor.java b/harry-core/src/harry/runner/MutatingPartitionVisitor.java
new file mode 100644
index 0000000..a818091
--- /dev/null
+++ b/harry-core/src/harry/runner/MutatingPartitionVisitor.java
@@ -0,0 +1,133 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.OpSelectors;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class MutatingPartitionVisitor extends AbstractPartitionVisitor
+{
+    private final List<String> statements = new ArrayList<>();
+    private final List<Object> bindings = new ArrayList<>();
+
+    private final List<CompletableFuture<?>> futures = new ArrayList<>();
+
+    protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
+    protected final DataTracker tracker;
+    protected final SystemUnderTest sut;
+    protected final RowVisitor rowVisitor;
+
+    public MutatingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run.pdSelector, run.descriptorSelector, run.schemaSpec);
+        this.tracker = run.tracker;
+        this.sut = run.sut;
+        this.rowVisitor = rowVisitorFactory.make(run);
+    }
+
+    public void beforeLts(long lts, long pd)
+    {
+        tracker.started(lts);
+    }
+
+    public void afterLts(long lts, long pd)
+    {
+        for (CompletableFuture<?> future : futures)
+        {
+            try
+            {
+                future.get();
+            }
+            catch (Throwable t)
+            {
+                throw new Model.ValidationException("Couldn't repeat operations within timeout bounds.", t);
+            }
+        }
+        futures.clear();
+        tracker.finished(lts);
+    }
+
+    public void beforeBatch(long lts, long pd, long m)
+    {
+        statements.clear();
+        bindings.clear();
+    }
+
+    protected void operation(long lts, long pd, long cd, long m, long opId)
+    {
+        CompiledStatement statement = operationInternal(lts, pd, cd, m, opId);
+        statements.add(statement.cql());
+        for (Object binding : statement.bindings())
+            bindings.add(binding);
+    }
+
+    protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
+    {
+        OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
+        return rowVisitor.visitRow(op, lts, pd, cd, opId);
+    }
+
+    public void afterBatch(long lts, long pd, long m)
+    {
+        String query = String.join(" ", statements);
+
+        if (statements.size() > 1)
+            query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
+
+        Object[] bindingsArray = new Object[bindings.size()];
+        bindings.toArray(bindingsArray);
+
+        CompletableFuture<Object[][]> future = new CompletableFuture<>();
+        executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
+        futures.add(future);
+
+        statements.clear();
+        bindings.clear();
+    }
+
+    void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
+    {
+        if (sut.isShutdown())
+            throw new IllegalStateException("System under test is shut down");
+
+        sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
+           .whenComplete((res, t) -> {
+               if (t != null)
+                   executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
+               else
+                   future.complete(res);
+           });
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+    }
+}
diff --git a/harry-core/src/harry/runner/DefaultRowVisitor.java b/harry-core/src/harry/runner/MutatingRowVisitor.java
similarity index 61%
rename from harry-core/src/harry/runner/DefaultRowVisitor.java
rename to harry-core/src/harry/runner/MutatingRowVisitor.java
index 1968192..01b5acd 100644
--- a/harry-core/src/harry/runner/DefaultRowVisitor.java
+++ b/harry-core/src/harry/runner/MutatingRowVisitor.java
@@ -18,6 +18,8 @@
 
 package harry.runner;
 
+import harry.core.MetricReporter;
+import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
@@ -25,32 +27,33 @@ import harry.operations.DeleteHelper;
 import harry.operations.WriteHelper;
 import harry.util.BitSet;
 
-public class DefaultRowVisitor implements RowVisitor
+public class MutatingRowVisitor implements RowVisitor
 {
-    private final SchemaSpec schema;
-    private final OpSelectors.MonotonicClock clock;
-    private final OpSelectors.DescriptorSelector descriptorSelector;
-    private final QuerySelector querySelector;
+    protected final SchemaSpec schema;
+    protected final OpSelectors.MonotonicClock clock;
+    protected final OpSelectors.DescriptorSelector descriptorSelector;
+    protected final QueryGenerator rangeSelector;
+    protected final MetricReporter metricReporter;
 
-    public DefaultRowVisitor(SchemaSpec schema,
-                             OpSelectors.MonotonicClock clock,
-                             OpSelectors.DescriptorSelector descriptorSelector,
-                             QuerySelector querySelector)
+    public MutatingRowVisitor(Run run)
     {
-        this.schema = schema;
-        this.clock = clock;
-        this.descriptorSelector = descriptorSelector;
-        this.querySelector = querySelector;
+        this.metricReporter = run.metricReporter;
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.descriptorSelector = run.descriptorSelector;
+        this.rangeSelector = run.rangeSelector;
     }
 
     public CompiledStatement write(long lts, long pd, long cd, long opId)
     {
+        metricReporter.insert();
         long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
         return WriteHelper.inflateInsert(schema, pd, cd, vds, clock.rts(lts));
     }
 
     public CompiledStatement deleteColumn(long lts, long pd, long cd, long opId)
     {
+        metricReporter.columnDelete();
         BitSet mask = descriptorSelector.columnMask(pd, lts, opId);
         return DeleteHelper.deleteColumn(schema, pd, cd, mask, clock.rts(lts));
     }
@@ -58,11 +61,19 @@ public class DefaultRowVisitor implements RowVisitor
 
     public CompiledStatement deleteRow(long lts, long pd, long cd, long opId)
     {
+        metricReporter.rowDelete();
         return DeleteHelper.deleteRow(schema, pd, cd, clock.rts(lts));
     }
 
     public CompiledStatement deleteRange(long lts, long pd, long opId)
     {
-        return querySelector.inflate(lts, opId).toDeleteStatement(clock.rts(lts));
+        metricReporter.rangeDelete();
+        return rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toDeleteStatement(clock.rts(lts));
+    }
+
+    public CompiledStatement deleteSlice(long lts, long pd, long opId)
+    {
+        metricReporter.rangeDelete();
+        return rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toDeleteStatement(clock.rts(lts));
     }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/PartitionVisitor.java b/harry-core/src/harry/runner/PartitionVisitor.java
index ef861fd..85e3b26 100644
--- a/harry-core/src/harry/runner/PartitionVisitor.java
+++ b/harry-core/src/harry/runner/PartitionVisitor.java
@@ -18,9 +18,14 @@
 
 package harry.runner;
 
+import harry.core.Run;
+
 public interface PartitionVisitor
 {
     void visitPartition(long lts);
-
-    void shutdown();
-}
+    public void shutdown() throws InterruptedException;
+    public interface PartitionVisitorFactory
+    {
+        public PartitionVisitor make(Run run);
+    }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/QueryGenerator.java b/harry-core/src/harry/runner/QueryGenerator.java
new file mode 100644
index 0000000..a50c450
--- /dev/null
+++ b/harry-core/src/harry/runner/QueryGenerator.java
@@ -0,0 +1,364 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.LongSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Run;
+import harry.ddl.ColumnSpec;
+import harry.ddl.SchemaSpec;
+import harry.generators.DataGenerators;
+import harry.generators.RngUtils;
+import harry.generators.Surjections;
+import harry.model.OpSelectors;
+import harry.operations.Relation;
+
+// TODO: there's a lot of potential to reduce an amount of garbage here.
+// TODO: refactor. Currently, this class is a base for both SELECT and DELETE statements. In retrospect,
+//       a better way to do the same thing would've been to just inflate bounds, be able to inflate
+//       any type of query from the bounds, and leave things like "reverse" up to the last mile / implementation.
+public class QueryGenerator
+{
+    private static final Logger logger = LoggerFactory.getLogger(QueryGenerator.class);
+    private static final long GT_STREAM = 0b1;
+    private static final long E_STREAM = 0b10;
+
+    private final OpSelectors.Rng rng;
+    private final OpSelectors.PdSelector pdSelector;
+    private final OpSelectors.DescriptorSelector descriptorSelector;
+    private final SchemaSpec schema;
+
+    public QueryGenerator(Run run)
+    {
+        this(run.schemaSpec, run.pdSelector, run.descriptorSelector, run.rng);
+    }
+
+    // TODO: remove constructor
+    public QueryGenerator(SchemaSpec schema,
+                          OpSelectors.PdSelector pdSelector,
+                          OpSelectors.DescriptorSelector descriptorSelector,
+                          OpSelectors.Rng rng)
+    {
+        this.pdSelector = pdSelector;
+        this.descriptorSelector = descriptorSelector;
+        this.schema = schema;
+        this.rng = rng;
+    }
+
+    public static class TypedQueryGenerator
+    {
+        private final OpSelectors.Rng rng;
+        private final QueryGenerator queryGenerator;
+        private final Surjections.Surjection<Query.QueryKind> queryKindGen;
+
+        public TypedQueryGenerator(Run run)
+        {
+            this(run.rng, new QueryGenerator(run));
+        }
+
+        public TypedQueryGenerator(OpSelectors.Rng rng,
+                                   QueryGenerator queryGenerator)
+        {
+            this(rng, Surjections.enumValues(Query.QueryKind.class), queryGenerator);
+        }
+
+        public TypedQueryGenerator(OpSelectors.Rng rng,
+                                   Surjections.Surjection<Query.QueryKind> queryKindGen,
+                                   QueryGenerator queryGenerator)
+        {
+            this.rng = rng;
+            this.queryGenerator = queryGenerator;
+            this.queryKindGen = queryKindGen;
+        }
+
+        // Queries are inflated from LTS, which identifies the partition, and i, a modifier for the query to
+        // be able to generate different queries for the same lts.
+        public Query inflate(long lts, long modifier)
+        {
+            long descriptor = rng.next(modifier, lts);
+            Query.QueryKind queryKind = queryKindGen.inflate(descriptor);
+            return queryGenerator.inflate(lts, modifier, queryKind);
+        }
+    }
+
+    public Query inflate(long lts, long modifier, Query.QueryKind queryKind)
+    {
+        long pd = pdSelector.pd(lts, schema);
+        long descriptor = rng.next(modifier, lts);
+        boolean reverse = descriptor % 2 == 0;
+        switch (queryKind)
+        {
+            case SINGLE_PARTITION:
+                return new Query.SinglePartitionQuery(queryKind,
+                                                      pd,
+                                                      reverse,
+                                                      Collections.emptyList(),
+                                                      schema);
+            case SINGLE_CLUSTERING:
+            {
+                long cd = descriptorSelector.randomCd(pd, descriptor, schema);
+                return new Query.SingleClusteringQuery(queryKind,
+                                                       pd,
+                                                       cd,
+                                                       reverse,
+                                                       Relation.eqRelations(schema.ckGenerator.slice(cd), schema.clusteringKeys),
+                                                       schema);
+            }
+            case CLUSTERING_SLICE:
+            {
+                List<Relation> relations = new ArrayList<>();
+                long cd = descriptorSelector.randomCd(pd, descriptor, schema);
+                boolean isGt = RngUtils.asBoolean(rng.next(descriptor, GT_STREAM));
+                // TODO: make generation of EQ configurable; turn it off and on
+                boolean isEquals = RngUtils.asBoolean(rng.next(descriptor, E_STREAM));
+
+                long[] sliced = schema.ckGenerator.slice(cd);
+                long min;
+                long max;
+                int nonEqFrom = RngUtils.asInt(descriptor, 0, sliced.length - 1);
+
+                long[] minBound = new long[sliced.length];
+                long[] maxBound = new long[sliced.length];
+
+                // Algorithm that determines boundaries for a clustering slice.
+                //
+                // Basic principles are not hard but there are a few edge cases. I haven't figured out how to simplify
+                // those, so there might be some room for improvement. In short, what we want to achieve is:
+                //
+                // 1. Every part that is restricted with an EQ relation goes into the bound verbatim.
+                // 2. Every part that is restricted with a non-EQ relation (LT, GT, LTE, GTE) is taken into the bound
+                //    if it is required to satisfy the relationship. For example, in `ck1 = 0 AND ck2 < 5`, ck2 will go
+                //    to the _max_ boundary, and minimum value will go to the _min_ boundary, since we can select every
+                //    descriptor that is prefixed with ck1.
+                // 3. Every other part (e.g., ones that are not explicitly mentioned in the query) has to be restricted
+                //    according to equality. For example, in `ck1 = 0 AND ck2 < 5`, ck3 that is present in schema but not
+                //    mentioned in query, makes sure that any value between [0, min_value, min_value] and [0, 5, min_value]
+                //    is matched.
+                //
+                // One edge case is a query on the first clustering key: `ck1 < 5`. In this case, we have to fixup the lower
+                // value to the minimum possible value. We could really just do Long.MIN_VALUE, but in case we forget to
+                // adjust entropy elsewhere, it'll be caught correctly here.
+                for (int i = 0; i < sliced.length; i++)
+                {
+                    long v = sliced[i];
+                    DataGenerators.KeyGenerator gen = schema.ckGenerator;
+                    ColumnSpec column = schema.clusteringKeys.get(i);
+                    int idx = i;
+                    LongSupplier maxSupplier = () -> gen.maxValue(idx);
+                    LongSupplier minSupplier = () -> gen.minValue(idx);
+
+                    if (i < nonEqFrom)
+                    {
+                        relations.add(Relation.eqRelation(schema.clusteringKeys.get(i), v));
+                        minBound[i] = v;
+                        maxBound[i] = v;
+                    }
+                    else if (i == nonEqFrom)
+                    {
+                        relations.add(Relation.relation(relationKind(isGt, isEquals), schema.clusteringKeys.get(i), v));
+
+                        if (column.isReversed())
+                        {
+                            minBound[i] = isGt ? minSupplier.getAsLong() : v;
+                            maxBound[i] = isGt ? v : maxSupplier.getAsLong();
+                        }
+                        else
+                        {
+                            minBound[i] = isGt ? v : minSupplier.getAsLong();
+                            maxBound[i] = isGt ? maxSupplier.getAsLong() : v;
+                        }
+                    }
+                    else
+                    {
+                        if (isEquals)
+                        {
+                            minBound[i] = minSupplier.getAsLong();
+                            maxBound[i] = maxSupplier.getAsLong();
+                        }
+                        else if (i > 0 && schema.clusteringKeys.get(i - 1).isReversed())
+                            maxBound[i] = minBound[i] = isGt ? minSupplier.getAsLong() : maxSupplier.getAsLong();
+                        else
+                            maxBound[i] = minBound[i] = isGt ? maxSupplier.getAsLong() : minSupplier.getAsLong();
+                    }
+                }
+
+                if (schema.clusteringKeys.get(nonEqFrom).isReversed())
+                    isGt = !isGt;
+
+                min = schema.ckGenerator.stitch(minBound);
+                max = schema.ckGenerator.stitch(maxBound);
+
+                if (nonEqFrom == 0)
+                {
+                    min = isGt ? min : schema.ckGenerator.minValue();
+                    max = !isGt ? max : schema.ckGenerator.maxValue();
+                }
+
+                // if we're about to create an "impossible" query, just bump the modifier and re-generate
+                if (min == max && !isEquals)
+                    return inflate(lts, modifier + 1, queryKind);
+
+                return new Query.ClusteringSliceQuery(Query.QueryKind.CLUSTERING_SLICE,
+                                                      pd,
+                                                      min,
+                                                      max,
+                                                      relationKind(true, isGt ? isEquals : true),
+                                                      relationKind(false, !isGt ? isEquals : true),
+                                                      reverse,
+                                                      relations,
+                                                      schema);
+            }
+            case CLUSTERING_RANGE:
+            {
+                List<Relation> relations = new ArrayList<>();
+                long cd1 = descriptorSelector.randomCd(pd, descriptor, schema);
+                boolean isMinEq = RngUtils.asBoolean(descriptor);
+                long cd2 = descriptorSelector.randomCd(pd, rng.next(descriptor, lts), schema);
+
+                boolean isMaxEq = RngUtils.asBoolean(rng.next(descriptor, lts));
+
+                long[] minBound = schema.ckGenerator.slice(cd1);
+                long[] maxBound = schema.ckGenerator.slice(cd2);
+
+                int lock = RngUtils.asInt(descriptor, 0, schema.clusteringKeys.size() - 1);
+
+                // Logic here is similar to how clustering slices are implemented, except for both lower and upper bound
+                // get their values from sliced value in (1) and (2) cases:
+                //
+                // 1. Every part that is restricted with an EQ relation, takes its value from the min bound.
+                //    TODO: this can actually be improved, since in case of hierarchical clustering generation we can
+                //          pick out of the keys that are already locked. That said, we'll exercise more cases the way
+                //          it is implemented right now.
+                // 2. Every part that is restricted with a non-EQ relation is taken into the bound, if it is used in
+                //    the query. For example in, `ck1 = 0 AND ck2 > 2 AND ck2 < 5`, ck2 values 2 and 5 will be placed,
+                //    correspondingly, to the min and max bound.
+                // 3. Every other part has to be restricted according to equality. Similar to clustering slice, we have
+                //    to decide whether we use a min or the max value for the bound. Foe example `ck1 = 0 AND ck2 > 2 AND ck2 <= 5`,
+                //    assuming we have ck3 that is present in schema but not mentioned in the query, we'll have bounds
+                //    created as follows: [0, 2, max_value] and [0, 5, max_value]. Idea here is that since ck2 = 2 is excluded,
+                //    we also disallow all ck3 values for [0, 2] prefix. Similarly, since ck2 = 5 is included, we allow every
+                //    ck3 value with a prefix of [0, 5].
+                for (int i = 0; i < schema.clusteringKeys.size(); i++)
+                {
+                    ColumnSpec<?> col = schema.clusteringKeys.get(i);
+                    if (i < lock)
+                    {
+                        relations.add(Relation.eqRelation(col, minBound[i]));
+                        maxBound[i] = minBound[i];
+                    }
+                    else if (i == lock)
+                    {
+                        long minLocked = Math.min(minBound[lock], maxBound[lock]);
+                        long maxLocked = Math.max(minBound[lock], maxBound[lock]);
+
+                        relations.add(Relation.relation(relationKind(true, isMinEq), col, minLocked));
+                        minBound[i] = col.isReversed() ? maxLocked : minLocked;
+                        relations.add(Relation.relation(relationKind(false, isMaxEq), col, maxLocked));
+                        maxBound[i] = col.isReversed() ? minLocked : maxLocked;
+                    }
+                    else
+                    {
+//                        if (i > 0 && schema.clusteringKeys.get(i - 1).isReversed())
+//                        {
+//                            minBound[i] = isMinEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i);
+//                            maxBound[i] = isMaxEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i);
+//                        }
+//                        else
+                        {
+                            minBound[i] = isMinEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i);
+                            maxBound[i] = isMaxEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i);
+                        }
+                    }
+                }
+
+                long stitchedMin = schema.ckGenerator.stitch(minBound);
+                long stitchedMax = schema.ckGenerator.stitch(maxBound);
+
+//                if (stitchedMin > stitchedMax)
+//                {
+//                    long[] tmp = minBound;
+//                    minBound = maxBound;
+//                    maxBound = tmp;
+//                    stitchedMin = schema.ckGenerator.stitch(minBound);
+//                    stitchedMax = schema.ckGenerator.stitch(maxBound);
+//                }
+//
+//                for (int i = 0; i <= lock; i++)
+//                {
+//                    ColumnSpec<?> col = schema.clusteringKeys.get(i);
+//                    if (i < lock)
+//                    {
+//                        relations.add(Relation.eqRelation(col, minBound[i]));
+//                    }
+//                    else
+//                    {
+//                        relations.add(Relation.relation(relationKind(true, isMinEq), col, minBound[lock]));
+//                        relations.add(Relation.relation(relationKind(false, isMaxEq), col, maxBound[lock]));
+//                    }
+//                }
+
+                // if we're about to create an "impossible" query, just bump the modifier and re-generate
+                // TODO: so this isn't considered "normal" that we do it this way, but I'd rather fix it with
+                //       a refactoring that's mentioned below
+                if (stitchedMin == stitchedMax)
+                {
+//                    if (modifier > 10)
+//                    {
+//                        logger.error(String.format("Unsuccessfully tried to generate query for %s%s;%s%s %s %d times. Schema: %s",
+//                                                   isMinEq ? "[" : "(", stitchedMin,
+//                                                   stitchedMax, isMaxEq ? "]" : ")",
+//                                                   queryKind, modifier, schema.compile().cql()),
+//                                     new RuntimeException());
+//                    }
+                    return inflate(lts, modifier + 1, queryKind);
+                }
+
+                // TODO: one of the ways to get rid of garbage here, and potentially even simplify the code is to
+                //       simply return bounds here. After bounds are created, we slice them and generate query right
+                //       from the bounds. In this case, we can even say that things like -inf/+inf are special values,
+                //       and use them as placeholdrs. Also, it'll be easier to manipulate relations.
+                return new Query.ClusteringRangeQuery(Query.QueryKind.CLUSTERING_RANGE,
+                                                      pd,
+                                                      stitchedMin,
+                                                      stitchedMax,
+                                                      relationKind(true, isMinEq),
+                                                      relationKind(false, isMaxEq),
+                                                      reverse,
+                                                      relations,
+                                                      schema);
+            }
+            default:
+                throw new IllegalArgumentException("Shouldn't happen");
+        }
+    }
+
+    public static Relation.RelationKind relationKind(boolean isGt, boolean isEquals)
+    {
+        if (isGt)
+            return isEquals ? Relation.RelationKind.GTE : Relation.RelationKind.GT;
+        else
+            return isEquals ? Relation.RelationKind.LTE : Relation.RelationKind.LT;
+    }
+}
diff --git a/harry-core/src/harry/runner/RecentPartitionValidator.java b/harry-core/src/harry/runner/RecentPartitionValidator.java
new file mode 100644
index 0000000..82d4bda
--- /dev/null
+++ b/harry-core/src/harry/runner/RecentPartitionValidator.java
@@ -0,0 +1,84 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import harry.core.MetricReporter;
+import harry.core.Run;
+import harry.generators.Surjections;
+import harry.model.Model;
+import harry.model.OpSelectors;
+
+public class RecentPartitionValidator implements PartitionVisitor
+{
+    private final Model model;
+
+    private final OpSelectors.MonotonicClock clock;
+    private final OpSelectors.PdSelector pdSelector;
+    private final QueryGenerator.TypedQueryGenerator querySelector;
+    private final MetricReporter metricReporter;
+    private final int partitionCount;
+    private final int triggerAfter;
+
+    public RecentPartitionValidator(int partitionCount,
+                                    int triggerAfter,
+                                    Run run,
+                                    Model.ModelFactory modelFactory)
+    {
+        this.partitionCount = partitionCount;
+        this.triggerAfter = triggerAfter;
+        this.metricReporter = run.metricReporter;
+        this.clock = run.clock;
+        this.pdSelector = run.pdSelector;
+
+        this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
+                                                                    // TODO: make query kind configurable
+                                                                    Surjections.enumValues(Query.QueryKind.class),
+                                                                    run.rangeSelector);
+        this.model = modelFactory.make(run);
+    }
+
+    // TODO: expose metric, how many times validated recent partitions
+    private void validateRecentPartitions(int partitionCount)
+    {
+        long maxLts = clock.maxLts();
+        long pos = pdSelector.positionFor(maxLts);
+
+        int maxPartitions = partitionCount;
+        while (pos > 0 && maxPartitions > 0 && !Thread.currentThread().isInterrupted())
+        {
+            long visitLts = pdSelector.minLtsAt(pos);
+
+            metricReporter.validateRandomQuery();
+            model.validate(querySelector.inflate(visitLts, 0));
+
+            pos--;
+            maxPartitions--;
+        }
+    }
+
+    public void visitPartition(long lts)
+    {
+        if (lts % triggerAfter == 0)
+            validateRecentPartitions(partitionCount);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+    }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/RowVisitor.java b/harry-core/src/harry/runner/RowVisitor.java
index 93674f9..b24babb 100644
--- a/harry-core/src/harry/runner/RowVisitor.java
+++ b/harry-core/src/harry/runner/RowVisitor.java
@@ -18,7 +18,7 @@
 
 package harry.runner;
 
-import harry.ddl.SchemaSpec;
+import harry.core.Run;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
 
@@ -26,10 +26,7 @@ public interface RowVisitor
 {
     interface RowVisitorFactory
     {
-        RowVisitor make(SchemaSpec schema,
-                        OpSelectors.MonotonicClock clock,
-                        OpSelectors.DescriptorSelector descriptorSelector,
-                        QuerySelector querySelector);
+        RowVisitor make(Run run);
     }
 
     default CompiledStatement visitRow(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
@@ -46,6 +43,8 @@ public interface RowVisitor
                 return deleteColumn(lts, pd, cd, opId);
             case DELETE_RANGE:
                 return deleteRange(lts, pd, opId);
+            case DELETE_SLICE:
+                return deleteSlice(lts, pd, opId);
             default:
                 throw new IllegalStateException();
         }
@@ -58,4 +57,8 @@ public interface RowVisitor
     CompiledStatement deleteRow(long lts, long pd, long cd, long opId);
 
     CompiledStatement deleteRange(long lts, long pd, long opId);
+
+    CompiledStatement deleteSlice(long lts, long pd, long opId);
+
+
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index 7f1696e..a96dd69 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -23,11 +23,10 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -43,8 +42,6 @@ import harry.model.OpSelectors;
 
 public abstract class Runner
 {
-    public static final Object[] EMPTY_BINDINGS = {};
-
     private static final Logger logger = LoggerFactory.getLogger(Runner.class);
 
     protected final Run run;
@@ -53,10 +50,10 @@ public abstract class Runner
     //  since we have multiple concurrent checkers running
     protected final CopyOnWriteArrayList<Throwable> errors;
 
-    public Runner(Run run)
+    public Runner(Run run, Configuration config)
     {
         this.run = run;
-        this.config = run.snapshot;
+        this.config = config;
         this.errors = new CopyOnWriteArrayList<>();
     }
 
@@ -70,12 +67,11 @@ public abstract class Runner
         if (config.create_schema)
         {
             // TODO: make RF configurable or make keyspace DDL configurable
-            run.sut.execute("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            run.sut.schemaChange("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
 
-            run.sut.execute(String.format("DROP TABLE IF EXISTS %s.%s;",
+            run.sut.schemaChange(String.format("DROP TABLE IF EXISTS %s.%s;",
                                           run.schemaSpec.keyspace,
-                                          run.schemaSpec.table),
-                            EMPTY_BINDINGS);
+                                          run.schemaSpec.table));
             String schema = run.schemaSpec.compile().cql();
             logger.info("Creating table: " + schema);
             run.sut.schemaChange(schema);
@@ -83,10 +79,9 @@ public abstract class Runner
 
         if (config.truncate_table)
         {
-            run.sut.execute(String.format("truncate %s.%s;",
-                                          run.schemaSpec.keyspace,
-                                          run.schemaSpec.table),
-                            EMPTY_BINDINGS);
+            run.sut.schemaChange(String.format("truncate %s.%s;",
+                                               run.schemaSpec.keyspace,
+                                               run.schemaSpec.table));
         }
     }
 
@@ -112,7 +107,7 @@ public abstract class Runner
     protected void maybeReportErrors()
     {
         if (!errors.isEmpty())
-            dumpStateToFile(run, errors);
+            dumpStateToFile(run, config, errors);
     }
 
     public abstract CompletableFuture initAndStartAll() throws InterruptedException;
@@ -140,20 +135,21 @@ public abstract class Runner
     {
         private final ScheduledExecutorService executor;
         private final ScheduledExecutorService shutdownExceutor;
-        private final int checkRecentAfter;
-        private final int checkAllAfter;
+        private final List<PartitionVisitor> partitionVisitors;
+        private final Configuration config;
 
         public SequentialRunner(Run run,
-                                int roundRobinValidatorThreads,
-                                int checkRecentAfter,
-                                int checkAllAfter)
+                                Configuration config,
+                                List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
         {
-            super(run);
+            super(run, config);
 
-            this.executor = Executors.newScheduledThreadPool(roundRobinValidatorThreads + 1);
+            this.executor = Executors.newSingleThreadScheduledExecutor();
             this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
-            this.checkAllAfter = checkAllAfter;
-            this.checkRecentAfter = checkRecentAfter;
+            this.config = config;
+            this.partitionVisitors = new ArrayList<>();
+            for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
+                partitionVisitors.add(factory.make(run));
         }
 
         public CompletableFuture initAndStartAll()
@@ -166,12 +162,12 @@ public abstract class Runner
                 logger.info("Completed");
                 // TODO: wait for the last full validation?
                 future.complete(null);
-            }, run.snapshot.run_time, run.snapshot.run_time_unit);
+            }, config.run_time, config.run_time_unit);
 
             executor.submit(reportThrowable(() -> {
                                                 try
                                                 {
-                                                    run(run.visitorFactory.get(), run.clock,
+                                                    run(partitionVisitors, run.clock,
                                                         () -> Thread.currentThread().isInterrupted() || future.isDone());
                                                 }
                                                 catch (Throwable t)
@@ -184,19 +180,15 @@ public abstract class Runner
             return future;
         }
 
-        void run(PartitionVisitor visitor,
+        void run(List<PartitionVisitor> visitors,
                  OpSelectors.MonotonicClock clock,
-                 BooleanSupplier exitCondition) throws ExecutionException, InterruptedException
+                 BooleanSupplier exitCondition)
         {
             while (!exitCondition.getAsBoolean())
             {
                 long lts = clock.nextLts();
-                visitor.visitPartition(lts);
-                if (lts % checkRecentAfter == 0)
-                    run.validator.validateRecentPartitions(100);
-
-                if (lts % checkAllAfter == 0)
-                    run.validator.validateAllPartitions(executor, exitCondition, 10).get();
+                for (PartitionVisitor partitionVisitor : visitors)
+                    partitionVisitor.visitPartition(lts);
             }
         }
 
@@ -215,105 +207,89 @@ public abstract class Runner
 
     public static interface RunnerFactory
     {
-        public Runner make(Run run);
+        public Runner make(Run run, Configuration config);
     }
 
     // TODO: this requires some significant improvement
     public static class ConcurrentRunner extends Runner
     {
         private final ScheduledExecutorService executor;
-        private final ScheduledExecutorService shutdownExceutor;
+        private final ScheduledExecutorService shutdownExecutor;
+        private final List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories;
+        private final List<PartitionVisitor> allVisitors;
 
-        private final int writerThreads;
-        private final int roundRobinValidators;
-        private final int recentPartitionValidators;
+        private final int concurrency;
         private final long runTime;
         private final TimeUnit runTimeUnit;
 
         public ConcurrentRunner(Run run,
-                                int writerThreads,
-                                int roundRobinValidators,
-                                int recentPartitionValidators)
+                                Configuration config,
+                                int concurrency,
+                                List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
         {
-            super(run);
-            this.writerThreads = writerThreads;
-            this.roundRobinValidators = roundRobinValidators;
-            this.recentPartitionValidators = recentPartitionValidators;
-            this.runTime = run.snapshot.run_time;
-            this.runTimeUnit = run.snapshot.run_time_unit;
-            this.executor = Executors.newScheduledThreadPool(this.writerThreads + this.roundRobinValidators + this.recentPartitionValidators);
-            this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
+            super(run, config);
+            this.concurrency = concurrency;
+            this.runTime = config.run_time;
+            this.runTimeUnit = config.run_time_unit;
+            // TODO: configure concurrency
+            this.executor = Executors.newScheduledThreadPool(concurrency);
+            this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
+            this.partitionVisitorFactories = partitionVisitorFactories;
+            this.allVisitors = new CopyOnWriteArrayList<>();
         }
 
-        public CompletableFuture initAndStartAll() throws InterruptedException
+        public CompletableFuture initAndStartAll()
         {
             init();
             CompletableFuture future = new CompletableFuture();
             future.whenComplete((a, b) -> maybeReportErrors());
 
-            shutdownExceutor.schedule(() -> {
+            shutdownExecutor.schedule(() -> {
                 logger.info("Completed");
                 // TODO: wait for the last full validation?
                 future.complete(null);
             }, runTime, runTimeUnit);
 
             BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
-            for (int i = 0; i < writerThreads; i++)
+            for (int i = 0; i < concurrency; i++)
             {
-                executor.submit(reportThrowable(() -> run(run.visitorFactory.get(), run.clock, exitCondition),
+                List<PartitionVisitor> partitionVisitors = new ArrayList<>();
+                executor.submit(reportThrowable(() -> {
+
+                                                    for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
+                                                    {
+                                                        partitionVisitors.add(factory.make(run));
+                                                    }
+                                                    allVisitors.addAll(partitionVisitors);
+                                                    run(partitionVisitors, run.clock, exitCondition);
+                                                },
                                                 future));
-            }
-
-            scheduleValidateAllPartitions(run.validator, executor, future, roundRobinValidators);
 
-            // N threads to validate recently written partitions
-            for (int i = 0; i < recentPartitionValidators; i++)
-            {
-                executor.scheduleWithFixedDelay(reportThrowable(() -> {
-                    // TODO: make recent partitions configurable
-                    run.validator.validateRecentPartitions(100);
-                }, future), 1000, 1, TimeUnit.MILLISECONDS);
             }
 
             return future;
         }
 
-        void run(PartitionVisitor visitor,
+        void run(List<PartitionVisitor> visitors,
                  OpSelectors.MonotonicClock clock,
                  BooleanSupplier exitCondition)
         {
             while (!exitCondition.getAsBoolean())
             {
                 long lts = clock.nextLts();
-                visitor.visitPartition(lts);
+                for (PartitionVisitor visitor : visitors)
+                    visitor.visitPartition(lts);
             }
         }
 
-        void scheduleValidateAllPartitions(Validator validator, ExecutorService executor, CompletableFuture future, int roundRobinValidators)
-        {
-            validator.validateAllPartitions(executor, () -> Thread.currentThread().isInterrupted() || future.isDone(),
-                                            roundRobinValidators)
-                     .handle((v, err) -> {
-                         if (err != null)
-                         {
-                             errors.add(err);
-                             if (!future.isDone())
-                                 future.completeExceptionally(err);
-                         }
-
-                         if (Thread.currentThread().isInterrupted() || future.isDone())
-                             return null;
-
-                         scheduleValidateAllPartitions(validator, executor, future, roundRobinValidators);
-                         return null;
-                     });
-        }
-
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            shutdownExceutor.shutdownNow();
-            shutdownExceutor.awaitTermination(1, TimeUnit.MINUTES);
+            for (PartitionVisitor visitor : allVisitors)
+                visitor.shutdown();
+
+            shutdownExecutor.shutdownNow();
+            shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
 
             executor.shutdownNow();
             executor.awaitTermination(1, TimeUnit.MINUTES);
@@ -346,7 +322,7 @@ public abstract class Runner
         bw.newLine();
     }
 
-    private static void dumpStateToFile(Run run, List<Throwable> t)
+    private static void dumpStateToFile(Run run, Configuration config, List<Throwable> t)
     {
         try
         {
@@ -361,14 +337,14 @@ public abstract class Runner
                 bw.flush();
             }
 
-            File config = new File("run.yaml");
-            Configuration.ConfigurationBuilder builder = run.snapshot.unbuild();
+            File file = new File("run.yaml");
+            Configuration.ConfigurationBuilder builder = config.unbuild();
 
             // overrride stateful components
             builder.setClock(run.clock.toConfig());
-            builder.setModel(run.model.toConfig());
+            builder.setDataTracker(run.tracker.toConfig());
 
-            try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(config))))
+            try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))))
             {
                 bw.write(Configuration.toYamlString(builder.build()));
                 bw.flush();
diff --git a/harry-core/src/harry/runner/SinglePartitionValidator.java b/harry-core/src/harry/runner/SinglePartitionValidator.java
new file mode 100644
index 0000000..e9d4f27
--- /dev/null
+++ b/harry-core/src/harry/runner/SinglePartitionValidator.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import harry.core.Run;
+import harry.model.Model;
+
+public class SinglePartitionValidator implements PartitionVisitor
+{
+    protected final int iterations;
+    protected final Model model;
+    protected final QueryGenerator queryGenerator;
+
+    public SinglePartitionValidator(int iterations,
+                                    Run run,
+                                    Model.ModelFactory modelFactory)
+    {
+        this.iterations = iterations;
+        this.model = modelFactory.make(run);
+        this.queryGenerator = new QueryGenerator(run);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+
+    }
+
+    public void visitPartition(long lts)
+    {
+        model.validate(queryGenerator.inflate(lts, 0, Query.QueryKind.SINGLE_PARTITION));
+
+        for (Query.QueryKind queryKind : new Query.QueryKind[]{ Query.QueryKind.CLUSTERING_RANGE, Query.QueryKind.CLUSTERING_SLICE, Query.QueryKind.SINGLE_CLUSTERING })
+        {
+            for (int i = 0; i < iterations; i++)
+            {
+                model.validate(queryGenerator.inflate(lts, i, queryKind));
+            }
+        }
+    }
+}
diff --git a/harry-core/src/harry/runner/Validator.java b/harry-core/src/harry/runner/Validator.java
deleted file mode 100644
index 4bd1d90..0000000
--- a/harry-core/src/harry/runner/Validator.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.runner;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BooleanSupplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.ddl.SchemaSpec;
-import harry.generators.Surjections;
-import harry.model.Model;
-import harry.model.OpSelectors;
-
-// This might be something that potentially grows into the validator described in the design doc;
-// right now it's just a helper/container class
-public class Validator
-{
-    private static final Logger logger = LoggerFactory.getLogger(Validator.class);
-
-    private final Model model;
-    private final SchemaSpec schema;
-
-    private final OpSelectors.MonotonicClock clock;
-    private final OpSelectors.PdSelector pdSelector;
-    private final QuerySelector querySelector;
-
-    public Validator(Model model,
-                     SchemaSpec schema,
-                     OpSelectors.MonotonicClock clock,
-                     OpSelectors.PdSelector pdSelector,
-                     OpSelectors.DescriptorSelector descriptorSelector,
-                     OpSelectors.Rng rng)
-    {
-        this.model = model;
-        this.schema = schema;
-        this.clock = clock;
-        this.pdSelector = pdSelector;
-        this.querySelector = new QuerySelector(schema,
-                                               pdSelector,
-                                               descriptorSelector,
-                                               Surjections.enumValues(Query.QueryKind.class),
-                                               rng);
-    }
-
-    // TODO: expose metric, how many times validated recent partitions
-    public void validateRecentPartitions(int partitionCount)
-    {
-        long maxLts = clock.maxLts();
-        long pos = pdSelector.positionFor(maxLts);
-
-        int maxPartitions = partitionCount;
-        while (pos > 0 && maxPartitions > 0 && !Thread.currentThread().isInterrupted())
-        {
-            long visitLts = pdSelector.minLtsAt(pos);
-
-            model.validatePartitionState(visitLts,
-                                         querySelector.inflate(visitLts, 0));
-
-            pos--;
-            maxPartitions--;
-        }
-    }
-
-    public CompletableFuture<Void> validateAllPartitions(ExecutorService executor, BooleanSupplier keepRunning, int parallelism)
-    {
-        long maxLts = clock.maxLts() - 1;
-        long maxPos = pdSelector.positionFor(maxLts);
-        AtomicLong counter = new AtomicLong();
-        CompletableFuture[] futures = new CompletableFuture[parallelism];
-        for (int i = 0; i < parallelism; i++)
-        {
-            futures[i] = CompletableFuture.supplyAsync(() -> {
-                long pos;
-                while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && keepRunning.getAsBoolean())
-                {
-                    if (pos > 0 && pos % 1000 == 0)
-                        logger.debug(String.format("Validated %d out of %d partitions", pos, maxPos));
-                    long visitLts = pdSelector.minLtsAt(pos);
-                    for (boolean reverse : new boolean[]{ true, false })
-                    {
-                        model.validatePartitionState(visitLts,
-                                                     Query.selectPartition(schema, pdSelector.pd(visitLts, schema), reverse));
-                    }
-                }
-                return null;
-            }, executor);
-        }
-        return CompletableFuture.allOf(futures);
-    }
-
-    public void validatePartition(long lts)
-    {
-        for (boolean reverse : new boolean[]{ true, false })
-        {
-            model.validatePartitionState(lts,
-                                         Query.selectPartition(schema, pdSelector.pd(lts, schema), reverse));
-        }
-    }
-}
diff --git a/harry-core/src/harry/util/Ranges.java b/harry-core/src/harry/util/Ranges.java
index a49b2bd..9da2af8 100644
--- a/harry-core/src/harry/util/Ranges.java
+++ b/harry-core/src/harry/util/Ranges.java
@@ -84,7 +84,8 @@ public class Ranges
 
         public Range(long minBound, long maxBound, boolean minInclusive, boolean maxInclusive, long timestamp)
         {
-            assert (minBound < maxBound) || ((minBound == maxBound) && minInclusive && maxInclusive) :
+            // (minBound < maxBound) ||
+            assert ((minBound != maxBound) || (minInclusive && maxInclusive)) :
             String.format("Min bound should be less than max bound, or both bounds have to be inclusive, but was: %s%d,%d%s",
                           minInclusive ? "[" : "(",
                           minBound, maxBound,
@@ -99,12 +100,24 @@ public class Ranges
 
         public boolean contains(long descriptor)
         {
-            if (minInclusive && descriptor == minBound)
-                return true;
-            if (maxInclusive && descriptor == maxBound)
-                return true;
+            if (minInclusive && maxInclusive)
+                return descriptor >= minBound && descriptor <= maxBound;
 
-            return (descriptor > minBound) && (descriptor < maxBound);
+            if (!minInclusive && !maxInclusive)
+                return descriptor > minBound && descriptor < maxBound;
+
+            if (!minInclusive && maxInclusive)
+                return descriptor > minBound && descriptor <= maxBound;
+
+            assert (minInclusive && !maxInclusive);
+            return descriptor >= minBound && descriptor < maxBound;
+
+//            if ((minInclusive && descriptor == minBound) && descriptor < maxBound)
+//                return true;
+//            if ((maxInclusive && descriptor == maxBound) && descriptor > minBound)
+//                return true;
+//
+//            return (descriptor > minBound) && (descriptor < maxBound);
         }
 
         public boolean contains(long descriptor, long ts)
diff --git a/harry-core/test/harry/generators/RandomGeneratorTest.java b/harry-core/test/harry/generators/RandomGeneratorTest.java
index c84fd9b..b34553d 100644
--- a/harry-core/test/harry/generators/RandomGeneratorTest.java
+++ b/harry-core/test/harry/generators/RandomGeneratorTest.java
@@ -31,13 +31,14 @@ import static junit.framework.TestCase.fail;
 
 public class RandomGeneratorTest
 {
+    private static int RUNS = 100000;
+
     @Test
     public void testShuffleUnshuffle()
     {
-        int iterations = 100000;
         Random rnd = new Random();
 
-        for (int i = 1; i < iterations; i++)
+        for (int i = 1; i < RUNS; i++)
         {
             long l = rnd.nextLong();
             Assert.assertEquals(l, PCGFastPure.unshuffle(PCGFastPure.shuffle(l)));
@@ -48,23 +49,38 @@ public class RandomGeneratorTest
     public void testImmutableRng()
     {
         int size = 5;
-        for (int stream = 1; stream < 1000000; stream++)
+        OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
+        for (int stream = 1; stream < RUNS; stream++)
         {
             long[] generated = new long[size];
-            OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
             for (int i = 0; i < size; i++)
                 generated[i] = rng.randomNumber(i, stream);
 
+            Assert.assertEquals(0, rng.sequenceNumber(generated[0], stream));
+            Assert.assertEquals(generated[1], rng.next(generated[0], stream));
+
             for (int i = 1; i < size; i++)
             {
                 Assert.assertEquals(generated[i], rng.next(generated[i - 1], stream));
                 Assert.assertEquals(generated[i - 1], rng.prev(generated[i], stream));
-                Assert.assertEquals(i - 1, rng.sequenceNumber(generated[i], stream));
+                Assert.assertEquals(i, rng.sequenceNumber(generated[i], stream));
             }
         }
     }
 
     @Test
+    public void testSequenceNumber()
+    {
+        int size = 5;
+        OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
+        for (int stream = 1; stream < RUNS; stream++)
+        {
+            for (int i = 0; i < size; i++)
+                Assert.assertEquals(i, rng.sequenceNumber(rng.randomNumber(i, stream), stream));
+        }
+    }
+
+    @Test
     public void seekTest()
     {
         // TODO: more examples; randomize
@@ -81,14 +97,14 @@ public class RandomGeneratorTest
         Assert.assertEquals(last, rand.next());
         Assert.assertEquals(first, rand.nextAt(0));
         Assert.assertEquals(last, rand.nextAt(10));
-        Assert.assertEquals(-11, rand.distance(first));
+        Assert.assertEquals(-10, rand.distance(first));
     }
 
     @Test
     public void shuffleUnshuffleTest()
     {
         Random rnd = new Random();
-        for (int i = 0; i < 100000; i++)
+        for (int i = 0; i < RUNS; i++)
         {
             long a = rnd.nextLong();
             Assert.assertEquals(a, PCGFastPure.unshuffle(PCGFastPure.shuffle(a)));
@@ -103,7 +119,7 @@ public class RandomGeneratorTest
         int a = 0;
         int b = 50;
         int[] cardinality = new int[b - a];
-        for (int i = 0; i < 100000; i++)
+        for (int i = 0; i < RUNS; i++)
         {
             int min = Math.min(a, b);
             int max = Math.max(a, b);
diff --git a/harry-core/test/harry/generators/SurjectionsTest.java b/harry-core/test/harry/generators/SurjectionsTest.java
index b927041..5cc64c7 100644
--- a/harry-core/test/harry/generators/SurjectionsTest.java
+++ b/harry-core/test/harry/generators/SurjectionsTest.java
@@ -31,6 +31,8 @@ import harry.generators.Surjections;
 
 public class SurjectionsTest
 {
+    private static int RUNS = 1000000;
+
     @Test
     public void weightedTest()
     {
@@ -42,7 +44,7 @@ public class SurjectionsTest
         Map<String, Integer> frequencies = new HashMap<>();
         RandomGenerator rng = new PcgRSUFast(System.currentTimeMillis(), 0);
 
-        for (int i = 0; i < 1000000; i++)
+        for (int i = 0; i < RUNS; i++)
         {
             String s = gen.inflate(rng.next());
             frequencies.compute(s, (s1, i1) -> {
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 6c55a7f..d90fa5f 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -32,16 +32,18 @@ import java.util.function.Supplier;
 import org.junit.Assert;
 import org.junit.Test;
 
+import harry.core.MetricReporter;
+import harry.core.Run;
 import harry.ddl.ColumnSpec;
 import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
-import harry.generators.PcgRSUFast;
-import harry.generators.RandomGenerator;
 import harry.generators.Surjections;
 import harry.generators.distribution.Distribution;
-import harry.model.sut.NoOpSut;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
-import harry.runner.DefaultPartitionVisitorFactory;
+import harry.runner.DataTracker;
+import harry.runner.MutatingPartitionVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.RowVisitor;
 import harry.util.BitSet;
@@ -50,6 +52,8 @@ import static harry.model.OpSelectors.DefaultDescriptorSelector.DEFAULT_OP_TYPE_
 
 public class OpSelectorsTest
 {
+    private static int RUNS = 10000;
+
     @Test
     public void testRowDataDescriptorSupplier()
     {
@@ -69,7 +73,7 @@ public class OpSelectorsTest
                                                                               100,
                                                                               100);
 
-        for (int lts = 0; lts < 100000; lts++)
+        for (int lts = 0; lts < RUNS; lts++)
         {
             long pd = pdSupplier.pd(lts);
             for (int m = 0; m < descriptorSelector.numberOfModifications(lts); m++)
@@ -94,9 +98,9 @@ public class OpSelectorsTest
     public void pdSelectorTest()
     {
         OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
-        int cycles = 1000;
+        int cycles = 10000;
 
-        for (int repeats = 2; repeats <= 100; repeats++)
+        for (int repeats = 2; repeats <= 1000; repeats++)
         {
             for (int windowSize = 2; windowSize <= 10; windowSize++)
             {
@@ -104,7 +108,9 @@ public class OpSelectorsTest
                 long[] pds = new long[cycles];
                 for (int i = 0; i < cycles; i++)
                 {
-                    pds[i] = pdSupplier.pd(i);
+                    long pd = pdSupplier.pd(i);
+                    pds[i] = pd;
+                    Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
                 }
 
                 Set<Long> noNext = new HashSet<>();
@@ -150,7 +156,8 @@ public class OpSelectorsTest
 
                 for (int i = 0; i < cycles; i++)
                 {
-                    long maxLts = pdSupplier.maxLts(i);
+                    long pd = pdSupplier.pd(i);
+                    long maxLts = pdSupplier.maxLtsFor(pd);
                     Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
                     Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
                 }
@@ -161,8 +168,8 @@ public class OpSelectorsTest
     @Test
     public void ckSelectorTest()
     {
-        Supplier<SchemaSpec> gen = SchemaGenerators.progression(5);
-        for (int i = 0; i < 30; i++)
+        Supplier<SchemaSpec> gen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
             ckSelectorTest(gen.get());
     }
 
@@ -191,37 +198,48 @@ public class OpSelectorsTest
             });
         };
 
-        PartitionVisitor partitionVisitor = new DefaultPartitionVisitorFactory(new DoNothingModel(),
-                                                                               new NoOpSut(),
-                                                                               pdSelector,
-                                                                               ckSelector,
-                                                                               schema,
-                                                                               new RowVisitor()
-                                                                        {
-                                                                            public CompiledStatement write(long lts, long pd, long cd, long m)
-                                                                            {
-                                                                                consumer.accept(pd, cd);
-                                                                                return compiledStatement;
-                                                                            }
+        Run run = new Run(rng,
+                new OffsetClock(0),
+                pdSelector,
+                ckSelector,
+                schema,
+                DataTracker.NO_OP,
+                SystemUnderTest.NO_OP,
+                MetricReporter.NO_OP);
+
+        PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run,
+                                                                         (r) -> new RowVisitor()
+                                                                         {
+                                                                             public CompiledStatement write(long lts, long pd, long cd, long m)
+                                                                             {
+                                                                                 consumer.accept(pd, cd);
+                                                                                 return compiledStatement;
+                                                                             }
+
+                                                                             public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
+                                                                             {
+                                                                                 consumer.accept(pd, cd);
+                                                                                 return compiledStatement;
+                                                                             }
 
-                                                                            public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
-                                                                            {
-                                                                                consumer.accept(pd, cd);
-                                                                                return compiledStatement;
-                                                                            }
+                                                                             public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
+                                                                             {
+                                                                                 consumer.accept(pd, cd);
+                                                                                 return compiledStatement;
+                                                                             }
 
-                                                                            public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
-                                                                            {
-                                                                                consumer.accept(pd, cd);
-                                                                                return compiledStatement;
-                                                                            }
+                                                                             public CompiledStatement deleteRange(long lts, long pd, long opId)
+                                                                             {
+                                                                                 // ignore
+                                                                                 return compiledStatement;
+                                                                             }
 
-                                                                            public CompiledStatement deleteRange(long lts, long pd, long opId)
-                                                                            {
-                                                                                // ignore
-                                                                                return compiledStatement;
-                                                                            }
-                                                                        }).get();
+                                                                             public CompiledStatement deleteSlice(long lts, long pd, long opId)
+                                                                             {
+                                                                                 // ignore
+                                                                                 return compiledStatement;
+                                                                             }
+                                                                         });
 
         for (int lts = 0; lts < 1000; lts++)
         {
diff --git a/harry-core/test/harry/operations/RelationTest.java b/harry-core/test/harry/operations/RelationTest.java
index 15ab02e..70ccd69 100644
--- a/harry-core/test/harry/operations/RelationTest.java
+++ b/harry-core/test/harry/operations/RelationTest.java
@@ -32,7 +32,7 @@ import harry.ddl.SchemaSpec;
 import harry.generators.DataGeneratorsTest;
 import harry.model.OpSelectors;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 
 public class RelationTest
@@ -42,9 +42,9 @@ public class RelationTest
     @Test
     public void testKeyGenerators()
     {
-        for (int cnt = 1; cnt < 5; cnt++)
+        for (int size = 1; size < 5; size++)
         {
-            Iterator<ColumnSpec.DataType[]> iter = DataGeneratorsTest.permutations(cnt,
+            Iterator<ColumnSpec.DataType[]> iter = DataGeneratorsTest.permutations(size,
                                                                                    ColumnSpec.DataType.class,
                                                                                    ColumnSpec.int8Type,
                                                                                    ColumnSpec.asciiType,
@@ -62,7 +62,7 @@ public class RelationTest
                     spec.add(ColumnSpec.ck("r" + i, types[i], false));
 
                 SchemaSpec schemaSpec = new SchemaSpec("ks",
-                                                   "tbl",
+                                                       "tbl",
                                                        Collections.singletonList(ColumnSpec.pk("pk", ColumnSpec.int64Type)),
                                                        spec,
                                                        Collections.emptyList(),
@@ -86,116 +86,123 @@ public class RelationTest
                 Arrays.sort(cds);
 
                 OpSelectors.Rng rng = new OpSelectors.PCGFast(1L);
-                // TODO: replace with mocks?
-                QuerySelector querySelector = new QuerySelector(schemaSpec,
-                                                                new OpSelectors.PdSelector()
-                                                                {
-                                                                    protected long pd(long lts)
-                                                                    {
-                                                                        return lts;
-                                                                    }
-
-                                                                    public long nextLts(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long prevLts(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long maxLts(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long minLtsAt(long position)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long minLtsFor(long pd)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long positionFor(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-                                                                },
-                                                                new OpSelectors.DescriptorSelector()
-                                                                {
-                                                                    public int numberOfModifications(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public int opsPerModification(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public int maxPartitionSize()
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public boolean isCdVisitedBy(long pd, long lts, long cd)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    protected long cd(long pd, long lts, long opId)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long randomCd(long pd, long entropy)
-                                                                    {
-                                                                        return  Math.abs(rng.prev(entropy)) % cds.length;
-                                                                    }
-
-                                                                    protected long vd(long pd, long cd, long lts, long opId, int col)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public OpSelectors.OperationKind operationType(long pd, long lts, long opId)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public BitSet columnMask(long pd, long lts, long opId)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long rowId(long pd, long lts, long cd)
-                                                                    {
-                                                                        return 0;
-                                                                    }
-
-                                                                    public long modificationId(long pd, long cd, long lts, long vd, int col)
-                                                                    {
-                                                                        return 0;
-                                                                    }
-                                                                },
-                                                                rng);
-
 
+                // TODO: replace with mocks?
+                QueryGenerator querySelector = new QueryGenerator(schemaSpec,
+                                                                  new OpSelectors.PdSelector()
+                                                                  {
+                                                                      protected long pd(long lts)
+                                                                      {
+                                                                          return lts;
+                                                                      }
+
+                                                                      public long nextLts(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long prevLts(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long maxLtsFor(long pd)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long maxLts(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long minLtsAt(long position)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+
+                                                                      public long minLtsFor(long pd)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long positionFor(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+                                                                  },
+                                                                  new OpSelectors.DescriptorSelector()
+                                                                  {
+                                                                      public int numberOfModifications(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public int opsPerModification(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public int maxPartitionSize()
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public boolean isCdVisitedBy(long pd, long lts, long cd)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      protected long cd(long pd, long lts, long opId)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long randomCd(long pd, long entropy)
+                                                                      {
+                                                                          return Math.abs(rng.prev(entropy)) % cds.length;
+                                                                      }
+
+                                                                      protected long vd(long pd, long cd, long lts, long opId, int col)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public OpSelectors.OperationKind operationType(long pd, long lts, long opId)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public BitSet columnMask(long pd, long lts, long opId)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long rowId(long pd, long lts, long cd)
+                                                                      {
+                                                                          return 0;
+                                                                      }
+
+                                                                      public long modificationId(long pd, long cd, long lts, long vd, int col)
+                                                                      {
+                                                                          return 0;
+                                                                      }
+                                                                  },
+                                                                  rng);
+
+                QueryGenerator.TypedQueryGenerator gen = new QueryGenerator.TypedQueryGenerator(rng, querySelector);
 
                 try
                 {
                     for (int i = 0; i < RUNS; i++)
                     {
-                        Query query = querySelector.inflate(i, 0);
+                        Query query = gen.inflate(i, 0);
                         for (int j = 0; j < cds.length; j++)
                         {
                             long cd = schemaSpec.ckGenerator.adjustEntropyDomain(cds[i]);
                             // the only thing we care about here is that query
-                            Assert.assertEquals(String.format("Error caught quen running a query %s with cd %d",
+                            Assert.assertEquals(String.format("Error caught while running a query %s with cd %d",
                                                               query, cd),
                                                 Query.simpleMatch(query, cd),
                                                 query.match(cd));
diff --git a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java b/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
index 2cf22e7..e471ede 100644
--- a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
+++ b/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
@@ -58,7 +58,7 @@ public class ExternalClusterSut implements SystemUnderTest
     {
         // TODO: close Cluster and Session!
         return new ExternalClusterSut(Cluster.builder()
-                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM))
+                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(toDriverCl(ConsistencyLevel.QUORUM)))
                                              .addContactPoints(config.contactPoints)
                                              .withPort(config.port)
                                              .withCredentials(config.username, config.password)
@@ -86,14 +86,16 @@ public class ExternalClusterSut implements SystemUnderTest
     }
 
     // TODO: this is rather simplistic
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
     {
         int repeat = 10;
         while (true)
         {
             try
             {
-                return resultSetToObjectArray(session.execute(statement, bindings));
+                Statement st = new SimpleStatement(statement, bindings);
+                st.setConsistencyLevel(toDriverCl(cl));
+                return resultSetToObjectArray(session.execute(st));
             }
             catch (Throwable t)
             {
@@ -129,10 +131,12 @@ public class ExternalClusterSut implements SystemUnderTest
         return results;
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
         CompletableFuture<Object[][]> future = new CompletableFuture<>();
-        Futures.addCallback(session.executeAsync(statement, bindings),
+        Statement st = new SimpleStatement(statement, bindings);
+        st.setConsistencyLevel(toDriverCl(cl));
+        Futures.addCallback(session.executeAsync(st),
                             new FutureCallback<ResultSet>()
                             {
                                 public void onSuccess(ResultSet rows)
@@ -176,4 +180,14 @@ public class ExternalClusterSut implements SystemUnderTest
             return ExternalClusterSut.create(this);
         }
     }
+
+    public static com.datastax.driver.core.ConsistencyLevel toDriverCl(SystemUnderTest.ConsistencyLevel cl)
+    {
+        switch (cl)
+        {
+            case ALL:    return com.datastax.driver.core.ConsistencyLevel.ALL;
+            case QUORUM: return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+        }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
+    }
 }
\ No newline at end of file
diff --git a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
index 5462568..702163e 100644
--- a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
+++ b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
@@ -21,10 +21,11 @@ package harry.runner.external;
 import harry.core.Configuration;
 import harry.model.sut.external.ExternalClusterSut;
 import harry.runner.HarryRunner;
+import harry.runner.Runner;
 
 import java.io.File;
 
-public class HarryRunnerExternal implements HarryRunner {
+public class HarryRunnerExternal extends HarryRunner {
 
     public static void main(String[] args) throws Throwable {
         ExternalClusterSut.registerSubtypes();
@@ -35,4 +36,9 @@ public class HarryRunnerExternal implements HarryRunner {
         Configuration configuration = Configuration.fromFile(configFile);
         runner.run(configuration);
     }
+
+    @Override
+    public void beforeRun(Runner runner) {
+
+    }
 }
diff --git a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java b/harry-integration/src/harry/model/sut/ExternalClusterSut.java
similarity index 66%
copy from harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
copy to harry-integration/src/harry/model/sut/ExternalClusterSut.java
index 2cf22e7..aefe508 100644
--- a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
+++ b/harry-integration/src/harry/model/sut/ExternalClusterSut.java
@@ -16,16 +16,7 @@
  *  limitations under the License.
  */
 
-package harry.model.sut.external;
-
-import com.datastax.driver.core.*;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import harry.core.Configuration;
-import harry.model.sut.SystemUnderTest;
+package harry.model.sut;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -33,13 +24,20 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
 public class ExternalClusterSut implements SystemUnderTest
 {
-    public static void registerSubtypes()
-    {
-        Configuration.registerSubtypes(ExternalSutConfiguration.class);
-    }
-
     private final Session session;
     private final ExecutorService executor;
 
@@ -54,14 +52,12 @@ public class ExternalClusterSut implements SystemUnderTest
         this.executor = Executors.newFixedThreadPool(threads);
     }
 
-    public static ExternalClusterSut create(ExternalSutConfiguration config)
+    public static ExternalClusterSut create()
     {
         // TODO: close Cluster and Session!
         return new ExternalClusterSut(Cluster.builder()
-                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM))
-                                             .addContactPoints(config.contactPoints)
-                                             .withPort(config.port)
-                                             .withCredentials(config.username, config.password)
+                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(toDriverCl(ConsistencyLevel.QUORUM)))
+                                             .addContactPoints("127.0.0.1")
                                              .build()
                                              .connect());
     }
@@ -86,14 +82,16 @@ public class ExternalClusterSut implements SystemUnderTest
     }
 
     // TODO: this is rather simplistic
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
     {
         int repeat = 10;
         while (true)
         {
             try
             {
-                return resultSetToObjectArray(session.execute(statement, bindings));
+                Statement st = new SimpleStatement(statement, bindings);
+                st.setConsistencyLevel(toDriverCl(cl));
+                return resultSetToObjectArray(session.execute(st));
             }
             catch (Throwable t)
             {
@@ -107,8 +105,7 @@ public class ExternalClusterSut implements SystemUnderTest
         }
     }
 
-
-    private static Object[][] resultSetToObjectArray(ResultSet rs)
+    public static Object[][] resultSetToObjectArray(ResultSet rs)
     {
         List<Row> rows = rs.all();
         if (rows.size() == 0)
@@ -129,10 +126,12 @@ public class ExternalClusterSut implements SystemUnderTest
         return results;
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
         CompletableFuture<Object[][]> future = new CompletableFuture<>();
-        Futures.addCallback(session.executeAsync(statement, bindings),
+        Statement st = new SimpleStatement(statement, bindings);
+        st.setConsistencyLevel(toDriverCl(cl));
+        Futures.addCallback(session.executeAsync(st),
                             new FutureCallback<ResultSet>()
                             {
                                 public void onSuccess(ResultSet rows)
@@ -150,30 +149,13 @@ public class ExternalClusterSut implements SystemUnderTest
         return future;
     }
 
-    @JsonTypeName("external")
-    public static class ExternalSutConfiguration implements Configuration.SutConfiguration
+    public static com.datastax.driver.core.ConsistencyLevel toDriverCl(SystemUnderTest.ConsistencyLevel cl)
     {
-
-        private final String contactPoints;
-        private final int port;
-        private final String username;
-        private final String password;
-
-        @JsonCreator
-        public ExternalSutConfiguration(@JsonProperty(value = "contact_points") String contactPoints,
-                                        @JsonProperty(value = "port") int port,
-                                        @JsonProperty(value = "username") String username,
-                                        @JsonProperty(value = "password") String password)
-        {
-            this.contactPoints = contactPoints;
-            this.port = port;
-            this.username = username;
-            this.password = password;
-        }
-
-        public SystemUnderTest make()
+        switch (cl)
         {
-            return ExternalClusterSut.create(this);
+            case ALL:    return com.datastax.driver.core.ConsistencyLevel.ALL;
+            case QUORUM: return com.datastax.driver.core.ConsistencyLevel.QUORUM;
         }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
     }
 }
\ No newline at end of file
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index d6b21a2..1786f60 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -35,11 +35,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import harry.core.Configuration;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
 
 public class InJvmSut implements SystemUnderTest
 {
-    public static void registerSubtypes()
+    public static void init()
     {
         Configuration.registerSubtypes(InJvmSutConfiguration.class);
     }
@@ -51,6 +55,7 @@ public class InJvmSut implements SystemUnderTest
     public final Cluster cluster;
     private final AtomicLong cnt = new AtomicLong();
     private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
     public InJvmSut(Cluster cluster)
     {
         this(cluster, 10);
@@ -62,6 +67,11 @@ public class InJvmSut implements SystemUnderTest
         this.executor = Executors.newFixedThreadPool(threads);
     }
 
+    public Cluster cluster()
+    {
+        return cluster;
+    }
+
     public boolean isShutdown()
     {
         return isShutdown.get();
@@ -73,6 +83,7 @@ public class InJvmSut implements SystemUnderTest
 
         cluster.close();
         executor.shutdown();
+
         try
         {
             executor.awaitTermination(30, TimeUnit.SECONDS);
@@ -88,17 +99,67 @@ public class InJvmSut implements SystemUnderTest
         cluster.schemaChange(statement);
     }
 
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return execute(statement, cl, (int) (cnt.getAndIncrement() % cluster.size() + 1), bindings);
+    }
+
+    public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, Object... bindings)
+    {
+        if (isShutdown.get())
+            throw new RuntimeException("Instance is shut down");
+
+        try
+        {
+            if (cl == ConsistencyLevel.NODE_LOCAL)
+            {
+                return cluster.get(coordinator)
+                              .executeInternal(statement, bindings);
+            }
+            else
+            {
+                return cluster
+                       // round-robin
+                       .coordinator(coordinator)
+                       .execute(statement, toApiCl(cl), bindings);
+            }
+        }
+        catch (Throwable t)
+        {
+            logger.error(String.format("Caught error while trying execute statement %s: %s", statement, t.getMessage()),
+                         t);
+            throw t;
+        }
+    }
+
+    // TODO: Ideally, we need to be able to induce a failure of a single specific message
+    public Object[][] executeWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
     {
         if (isShutdown.get())
             throw new RuntimeException("Instance is shut down");
 
         try
         {
-            return cluster
-                   // round-robin
-                   .coordinator((int) (cnt.getAndIncrement() % cluster.size() + 1))
-                   .execute(statement, ConsistencyLevel.QUORUM, bindings);
+            int coordinator = (int) (cnt.getAndIncrement() % cluster.size() + 1);
+            IMessageFilters filters = cluster.filters();
+
+            // Drop exactly one coordinated message
+            filters.verbs(Verb.MUTATION_REQ.id).from(coordinator).messagesMatching(new IMessageFilters.Matcher()
+            {
+                private final AtomicBoolean issued = new AtomicBoolean();
+                public boolean matches(int from, int to, IMessage message)
+                {
+                    if (from != coordinator || message.verb() != Verb.MUTATION_REQ.id)
+                        return false;
+
+                    return !issued.getAndSet(true);
+                }
+            }).drop().on();
+            Object[][] res = cluster
+                             .coordinator(coordinator)
+                             .execute(statement, toApiCl(cl), bindings);
+            filters.reset();
+            return res;
         }
         catch (Throwable t)
         {
@@ -108,9 +169,14 @@ public class InJvmSut implements SystemUnderTest
         }
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
-        return CompletableFuture.supplyAsync(() -> execute(statement, bindings), executor);
+        return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings), executor);
+    }
+
+    public CompletableFuture<Object[][]> executeAsyncWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return CompletableFuture.supplyAsync(() -> executeWithWriteFailure(statement, cl, bindings), executor);
     }
 
     @JsonTypeName("in_jvm")
@@ -132,17 +198,26 @@ public class InJvmSut implements SystemUnderTest
 
         public SystemUnderTest make()
         {
+            try
+            {
+                ICluster.setup();
+            }
+            catch (Throwable throwable)
+            {
+                throwable.printStackTrace();
+            }
+
             Cluster cluster;
             try
             {
                 cluster = Cluster.build().withConfig((cfg) -> {
                     // TODO: make this configurable
-                    cfg.set("row_cache_size_in_mb", 10L)
+                    cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                       .set("row_cache_size_in_mb", 10L)
                        .set("index_summary_capacity_in_mb", 10L)
                        .set("counter_cache_size_in_mb", 10L)
                        .set("key_cache_size_in_mb", 10L)
                        .set("file_cache_size_in_mb", 10)
-                       .set("prepared_statements_cache_size_mb", 10L)
                        .set("memtable_heap_space_in_mb", 128)
                        .set("memtable_offheap_space_in_mb", 128)
                        .set("memtable_flush_writers", 1)
@@ -151,7 +226,8 @@ public class InJvmSut implements SystemUnderTest
                        .set("concurrent_writes", 5)
                        .set("compaction_throughput_mb_per_sec", 10)
                        .set("hinted_handoff_enabled", false);
-                }).withNodes(nodes)
+                })
+                                 .withNodes(nodes)
                                  .withRoot(new File(root)).createWithoutStarting();
             }
             catch (IOException e)
@@ -163,4 +239,15 @@ public class InJvmSut implements SystemUnderTest
             return new InJvmSut(cluster);
         }
     }
+
+    public static org.apache.cassandra.distributed.api.ConsistencyLevel toApiCl(ConsistencyLevel cl)
+    {
+        switch (cl)
+        {
+            case ALL:    return org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+            case QUORUM: return org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+            case NODE_LOCAL: return org.apache.cassandra.distributed.api.ConsistencyLevel.NODE_LOCAL;
+        }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
+    }
 }
\ No newline at end of file
diff --git a/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java b/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java
new file mode 100644
index 0000000..84e1a73
--- /dev/null
+++ b/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class FaultInjectingPartitionVisitor extends LoggingPartitionVisitor
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(FaultInjectingPartitionVisitorConfiguration.class);
+    }
+
+    @JsonTypeName("fault_injecting")
+    public static class FaultInjectingPartitionVisitorConfiguration extends Configuration.MutatingPartitionVisitorConfiguation
+    {
+        @JsonCreator
+        public FaultInjectingPartitionVisitorConfiguration(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration row_visitor)
+        {
+            super(row_visitor);
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new FaultInjectingPartitionVisitor(run, row_visitor);
+        }
+    }
+
+    private final AtomicInteger cnt = new AtomicInteger();
+
+    private final InJvmSut sut;
+
+    public FaultInjectingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run, rowVisitorFactory);
+        this.sut = (InJvmSut) run.sut;
+    }
+
+    void executeAsyncWithRetries(CompletableFuture<Object[][]> originator, CompiledStatement statement)
+    {
+        executeAsyncWithRetries(originator, statement, true);
+    }
+
+    void executeAsyncWithRetries(CompletableFuture<Object[][]> originator, CompiledStatement statement, boolean allowFailures)
+    {
+        if (sut.isShutdown())
+            throw new IllegalStateException("System under test is shut down");
+
+        CompletableFuture<Object[][]> future;
+        if (allowFailures && cnt.getAndIncrement() % 2 == 0)
+        {
+            future = sut.executeAsyncWithWriteFailure(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings());
+        }
+        else
+        {
+            future = sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings());
+        }
+
+        future.whenComplete((res, t) -> {
+               if (t != null)
+                   executor.schedule(() -> executeAsyncWithRetries(originator, statement, false), 1, TimeUnit.SECONDS);
+               else
+                   originator.complete(res);
+           });
+    }
+}
diff --git a/harry-integration/src/harry/runner/QueryingNoOpChecker.java b/harry-integration/src/harry/runner/QueryingNoOpChecker.java
new file mode 100644
index 0000000..08b8e6b
--- /dev/null
+++ b/harry-integration/src/harry/runner/QueryingNoOpChecker.java
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class QueryingNoOpChecker implements Model
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(QueryingNoOpCheckerConfig.class);
+    }
+
+    private final Run run;
+
+    public QueryingNoOpChecker(Run run)
+    {
+        this.run = run;
+    }
+
+    @Override
+    public void validate(Query query)
+    {
+        CompiledStatement compiled = query.toSelectStatement();
+        run.sut.execute(compiled.cql(),
+                        SystemUnderTest.ConsistencyLevel.QUORUM,
+                        compiled.bindings());
+    }
+
+    @JsonTypeName("querying_no_op_checker")
+    public static class QueryingNoOpCheckerConfig implements Configuration.ModelConfiguration
+    {
+        @JsonCreator
+        public QueryingNoOpCheckerConfig()
+        {
+        }
+
+        public Model make(Run run)
+        {
+            return new QueryingNoOpChecker(run);
+        }
+    }
+}
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
new file mode 100644
index 0000000..98beb93
--- /dev/null
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -0,0 +1,137 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.data.ResultSetRow;
+import harry.model.Model;
+import harry.model.QuiescentChecker;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+import static harry.model.SelectHelper.resultSetToRow;
+
+public class RepairingLocalStateValidator extends AllPartitionsValidator
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(RepairingLocalStateValidatorConfiguration.class,
+                                       QuiescentCheckerConfig.class);
+    }
+
+    public final InJvmSut inJvmSut;
+
+    public RepairingLocalStateValidator(int concurrency, int triggerAfter, Run run, Model.ModelFactory modelFactory)
+    {
+        super(concurrency, triggerAfter, run, modelFactory);
+
+        this.inJvmSut = (InJvmSut) run.sut;
+    }
+
+    @Override
+    public void visitPartition(long lts)
+    {
+        if (lts > 0 && lts % triggerAfter == 0)
+        {
+            System.out.println("Starting repair...");
+            inJvmSut.cluster().get(1).nodetool("repair", "--full");
+            System.out.println("Validating partitions...");
+            validateAllPartitions(executor, concurrency);
+        }
+    }
+
+    @JsonTypeName("repair_and_validate_local_states")
+    public static class RepairingLocalStateValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    {
+        private final int concurrency;
+        private final int trigger_after;
+        private final Configuration.ModelConfiguration modelConfiguration;
+
+        @JsonCreator
+        public RepairingLocalStateValidatorConfiguration(@JsonProperty("concurrency") int concurrency,
+                                                         @JsonProperty("trigger_after") int trigger_after,
+                                                         @JsonProperty("model") Configuration.ModelConfiguration model)
+        {
+            this.concurrency = concurrency;
+            this.trigger_after = trigger_after;
+            this.modelConfiguration = model;
+        }
+
+        public PartitionVisitor make(Run run)
+        {
+            return new RepairingLocalStateValidator(concurrency, trigger_after, run, modelConfiguration);
+        }
+    }
+
+    public static class QuiescentLocalStateChecker extends QuiescentChecker
+    {
+        public final InJvmSut inJvmSut;
+
+        public QuiescentLocalStateChecker(Run run)
+        {
+            super(run);
+            assert run.sut instanceof InJvmSut;
+
+            this.inJvmSut = (InJvmSut) run.sut;
+        }
+
+        @Override
+        public void validate(Query query)
+        {
+            CompiledStatement compiled = query.toSelectStatement();
+            for (int i = 1; i <= inJvmSut.cluster.size(); i++)
+            {
+                int node = i;
+                validate(() -> {
+                    Object[][] objects = inJvmSut.execute(compiled.cql(),
+                                                          SystemUnderTest.ConsistencyLevel.NODE_LOCAL,
+                                                          node,
+                                                          compiled.bindings());
+                    List<ResultSetRow> result = new ArrayList<>();
+                    for (Object[] obj : objects)
+                        result.add(resultSetToRow(query.schemaSpec, clock, obj));
+
+                    return result;
+                }, query);
+            }
+        }
+    }
+
+    @JsonTypeName("quiescent_local_state_checker")
+    public static class QuiescentCheckerConfig implements Configuration.ModelConfiguration
+    {
+        @JsonCreator
+        public QuiescentCheckerConfig()
+        {
+        }
+
+        public Model make(Run run)
+        {
+            return new QuiescentLocalStateChecker(run);
+        }
+    }
+}
diff --git a/harry-integration/src/harry/runner/Reproduce.java b/harry-integration/src/harry/runner/Reproduce.java
index 049c864..78a96f6 100644
--- a/harry-integration/src/harry/runner/Reproduce.java
+++ b/harry-integration/src/harry/runner/Reproduce.java
@@ -33,7 +33,7 @@ public class Reproduce extends TestBaseImpl
 
     public void runWithInJvmDtest() throws Throwable
     {
-        InJvmSut.registerSubtypes();
+        InJvmSut.init();
 
         System.setProperty("cassandra.disable_tcactive_openssl", "true");
         System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
@@ -46,7 +46,7 @@ public class Reproduce extends TestBaseImpl
 
         try
         {
-            run.validator.validatePartition(0L);
+//            run.validator.validatePartition(0L);
         }
         catch(Throwable t)
         {
diff --git a/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java b/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
index 4a38e7b..ecbf499 100644
--- a/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
@@ -19,10 +19,9 @@
 package harry.model;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import harry.core.Configuration;
@@ -33,34 +32,40 @@ import harry.corruptor.HideRowCorruptor;
 import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
+import harry.ddl.SchemaGenerators;
+import harry.model.sut.InJvmSut;
 import harry.model.sut.SystemUnderTest;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.Validator;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import harry.runner.SinglePartitionValidator;
 
-public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
+public class ExhaustiveCheckerIntegrationTest extends ModelTestBase
 {
     @Test
     public void testVerifyPartitionState()
     {
-        Configuration config = sharedConfiguration(1).build();
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
+        Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
 
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        for (int i = 0; i < 2000; i++)
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
-        }
+            Configuration config = gen.get().build();
+            Run run = config.createRun();
+            run.sut.schemaChange(run.schemaSpec.compile().cql());
+            OpSelectors.MonotonicClock clock = run.clock;
 
-        validator.validatePartition(0);
+            SinglePartitionValidator validator = new SinglePartitionValidator(100, run, modelConfiguration());
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+
+            for (int i = 0; i < 2000; i++)
+            {
+                long lts = clock.nextLts();
+                partitionVisitor.visitPartition(lts);
+            }
+
+            validator.visitPartition(0);
+        }
     }
 
     @Test
@@ -71,13 +76,13 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideRowCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              t.getCause() != null &&t.getCause().toString().contains(OpSelectors.OperationKind.WRITE.toString())));
+                     (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+                                                   t.getCause() != null && t.getCause().toString().contains(OpSelectors.OperationKind.WRITE.toString())));
     }
 
     @Test
@@ -88,18 +93,21 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                      run.clock,
                                                                                      run.descriptorSelector);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              // TODO: this is not entirely correct. Right now, after registering a deletion followed by no writes,
-                                              // we would continue going back in time and checking other operations, even though we don't have to do this.
-                                              t.getCause().getMessage().contains("Modification should have been visible but was not") ||
-                                              t.getCause().getMessage().contains("Observed unvalidated rows") ||
-                                              t.getCause() != null && t.getCause().getMessage().contains("was never written"));
+                                           // TODO: this is not entirely correct. Right now, after registering a deletion followed by no writes,
+                                           // we would continue going back in time and checking other operations, even though we don't have to do this.
+                                           t.getCause().getMessage().contains("Modification should have been visible but was not") ||
+                                           // TODO: this is not entirely correct, either. This row is, in fact, present in both dataset _and_
+                                           // in the model, it's just there might be _another_ row right in front of it.
+                                           t.getCause().getMessage().contains("expected row not to be visible") ||
+                                           t.getCause().getMessage().contains("Observed unvalidated rows") ||
+                                           t.getCause() != null && t.getCause().getMessage().contains("was never written"));
                      });
     }
 
@@ -112,18 +120,17 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not")));
+                     (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+                                                   t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not")));
     }
 
 
     @Test
-    @Ignore
     public void testDetectsOverwrittenRow()
     {
         negativeTest((run) -> {
@@ -131,89 +138,61 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    ChangeValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not.")));
+                     (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+                                                   t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not.")));
     }
 
-
-    static void negativeTest(Consumer<Run> corrupt, Consumer<Throwable> validate)
+    @Test
+    public void testLocalOnlyExecution()
     {
-        Configuration config = sharedConfiguration()
-                               .setClusteringDescriptorSelector((rng, schemaSpec) -> {
-                                   return new OpSelectors.DefaultDescriptorSelector(rng,
-                                                                                    new OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec.regularColumns.size()).build(),
-                                                                                    Surjections.pick(OpSelectors.OperationKind.DELETE_COLUMN,
-                                                                                                     OpSelectors.OperationKind.DELETE_ROW,
-                                                                                                     OpSelectors.OperationKind.WRITE),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    100);
-                               })
-                               .build();
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        for (int i = 0; i < 200; i++)
-        {
-            long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
-        }
+        LocalOnlySut localOnlySut = new LocalOnlySut();
 
-        corrupt.accept(run);
+        Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
 
-        try
-        {
-            validator.validatePartition(0);
-            Assert.fail("Should've thrown");
-        }
-        catch (Throwable t)
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            validate.accept(t);
+            Configuration config = gen.get()
+                                   .setClusteringDescriptorSelector((builder) -> {
+                                       builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                                                       .addWeight(OpSelectors.OperationKind.DELETE_ROW, 80)
+                                                                       .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
+                                                                       .addWeight(OpSelectors.OperationKind.WRITE, 10)
+                                                                       .build());
+                                   })
+                                   .setSUT(() -> localOnlySut)
+                                   .build();
+
+            Run run = config.createRun();
+            run.sut.schemaChange(run.schemaSpec.compile().cql());
+
+            OpSelectors.MonotonicClock clock = run.clock;
+
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+
+            localOnlySut.localOnly(() -> {
+                for (int i = 0; i < 5; i++)
+                {
+                    long lts = clock.nextLts();
+                    partitionVisitor.visitPartition(lts);
+                }
+            });
+
+            SinglePartitionValidator validator = new SinglePartitionValidator(100, run, ExhaustiveChecker::new);
+            validator.visitPartition(0);
         }
     }
 
-    @Test
-    public void testLocalOnlyExecution()
+    Configuration.ModelConfiguration modelConfiguration()
     {
-        LocalOnlySut localOnlySut = new LocalOnlySut();
-        Configuration config = sharedConfiguration()
-                               .setClusteringDescriptorSelector((builder) -> {
-                                   builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
-                                                                   .addWeight(OpSelectors.OperationKind.DELETE_ROW, 80)
-                                                                   .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
-                                                                   .addWeight(OpSelectors.OperationKind.WRITE, 10)
-                                                                   .build());
-                               })
-                               .setSUT(() -> localOnlySut)
-                               .build();
-
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
-
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        localOnlySut.localOnly(() -> {
-            for (int i = 0; i < 5; i++)
-            {
-                long lts = clock.nextLts();
-                partitionVisitor.visitPartition(lts);
-            }
-        });
-
-        validator.validatePartition(0);
+        return new Configuration.ExhaustiveCheckerConfig();
     }
 
-    private static class LocalOnlySut implements SystemUnderTest
+    public static class LocalOnlySut implements SystemUnderTest
     {
         private boolean localOnly = false;
         private int counter = 0;
@@ -233,17 +212,17 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
             cluster.schemaChange(statement);
         }
 
-        public Object[][] execute(String statement, Object... bindings)
+        public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
         {
             if (localOnly)
                 return cluster.get((counter++) % cluster.size() + 1).executeInternal(statement, bindings);
             else
-                return cluster.coordinator((counter++) % cluster.size() + 1).execute(statement, ConsistencyLevel.ALL, bindings);
+                return cluster.coordinator((counter++) % cluster.size() + 1).execute(statement, InJvmSut.toApiCl(ConsistencyLevel.ALL), bindings);
         }
 
-        public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+        public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
         {
-            return CompletableFuture.supplyAsync(() -> execute(statement, bindings));
+            return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings));
         }
 
         public void localOnly(Runnable r)
diff --git a/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java b/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
index 84d5a82..7c1ca04 100644
--- a/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
+++ b/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
@@ -26,13 +26,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.model.sut.NoOpSut;
+import harry.ddl.SchemaGenerators;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
@@ -43,55 +45,58 @@ public class ExhaustiveCheckerUnitTest
     @Test
     public void testOperationConsistency()
     {
-        LoggingRowVisitor rowVisitor = new LoggingRowVisitor();
-
-        Configuration config = IntegrationTestBase.sharedConfiguration()
-                                                  .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(10, 10))
-                                                  .setClusteringDescriptorSelector((builder) -> {
-                                                      builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
-                                                                                      .addWeight(OpSelectors.OperationKind.DELETE_ROW, 33)
-                                                                                      .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 33)
-                                                                                      .addWeight(OpSelectors.OperationKind.WRITE, 34)
-                                                                                      .build());
-                                                  })
-                                                  .setRowVisitor((a_, b_, c_, d_) -> rowVisitor)
-                                                  .setSUT(NoOpSut::new)
-                                                  .setCreateSchema(true)
-                                                  .build();
-
-        Run run = config.createRun();
-
-        ExhaustiveChecker checker = (ExhaustiveChecker) run.model;
-
-        int iterations = 10;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-        for (int i = 0; i < iterations; i++)
-            partitionVisitor.visitPartition(i);
-
-        for (List<ExhaustiveChecker.Operation> value : rowVisitor.executed.values())
-            Collections.sort(value);
-
-        for (int lts = 0; lts < iterations; lts++)
+        Supplier<Configuration.ConfigurationBuilder> gen = IntegrationTestBase.sharedConfiguration();
+
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            // TODO: turn query into interface to make it easier to deal with here
-            for (Collection<ExhaustiveChecker.Operation> modelOps : checker.inflatePartitionState(lts,
-                                                                                                  (iterations - 1),
-                                                                                                  Query.selectPartition(run.schemaSpec,
-                                                                                                                        run.pdSelector.pd(lts),
-                                                                                                                        false))
-                                                                           .operations.values())
+            LoggingRowVisitor rowVisitor = new LoggingRowVisitor();
+            Configuration config = gen.get()
+                                      .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(10, 10))
+                                      .setClusteringDescriptorSelector((builder) -> {
+                                          builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                                                          .addWeight(OpSelectors.OperationKind.DELETE_ROW, 33)
+                                                                          .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 33)
+                                                                          .addWeight(OpSelectors.OperationKind.WRITE, 34)
+                                                                          .build());
+                                      })
+                                      .setSUT(() -> SystemUnderTest.NO_OP)
+                                      .setCreateSchema(true)
+                                      .build();
+
+            Run run = config.createRun();
+
+            ExhaustiveChecker checker = new ExhaustiveChecker(run);
+
+            PartitionVisitor visitor = new Configuration.MutatingPartitionVisitorConfiguation((r) -> rowVisitor).make(run);
+            int iterations = 10;
+
+            for (int i = 0; i < iterations; i++)
+                visitor.visitPartition(i);
+
+            for (List<ExhaustiveChecker.Operation> value : rowVisitor.executed.values())
+                Collections.sort(value);
+
+            for (int lts = 0; lts < iterations; lts++)
             {
-                ExhaustiveChecker.Operation op = modelOps.iterator().next();
-                List<ExhaustiveChecker.Operation> executedOps = rowVisitor.executed.get(new Pair(op.pd, op.cd));
-                Iterator<ExhaustiveChecker.Operation> modelIterator = modelOps.iterator();
-                Iterator<ExhaustiveChecker.Operation> executedIterator = executedOps.iterator();
-                while (modelIterator.hasNext() && executedIterator.hasNext())
+                // TODO: turn query into interface to make it easier to deal with here
+                for (Collection<ExhaustiveChecker.Operation> modelOps : checker.inflatePartitionState(iterations - 1,
+                                                                                                      Query.selectPartition(run.schemaSpec,
+                                                                                                                            run.pdSelector.pd(lts),
+                                                                                                                            false))
+                                                                        .operations.values())
                 {
+                    ExhaustiveChecker.Operation op = modelOps.iterator().next();
+                    List<ExhaustiveChecker.Operation> executedOps = rowVisitor.executed.get(new Pair(op.pd, op.cd));
+                    Iterator<ExhaustiveChecker.Operation> modelIterator = modelOps.iterator();
+                    Iterator<ExhaustiveChecker.Operation> executedIterator = executedOps.iterator();
+                    while (modelIterator.hasNext() && executedIterator.hasNext())
+                    {
+                        Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
+                                            executedIterator.next(), modelIterator.next());
+                    }
                     Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
-                                        executedIterator.next(), modelIterator.next());
+                                        modelIterator.hasNext(), executedIterator.hasNext());
                 }
-                Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
-                                    modelIterator.hasNext(), executedIterator.hasNext());
             }
         }
     }
@@ -131,6 +136,11 @@ public class ExhaustiveCheckerUnitTest
         {
             return null;
         }
+
+        public CompiledStatement deleteSlice(long lts, long pd, long opId)
+        {
+            return null;
+        }
     }
 
     private static class Pair
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index 08d9f46..a3c2dfa 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -18,13 +18,14 @@
 
 package harry.model;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
 import harry.core.Configuration;
+import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.model.clock.OffsetClock;
 import harry.model.sut.InJvmSut;
@@ -57,22 +58,16 @@ public class IntegrationTestBase extends TestBaseImpl
         cluster.schemaChange("CREATE KEYSPACE harry WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
     }
 
-    // TODO: run these tests with like a hundred different tables?
-    static Configuration.ConfigurationBuilder sharedConfiguration()
+    private static long seed = 0;
+    public static Supplier<Configuration.ConfigurationBuilder> sharedConfiguration()
     {
-        return sharedConfiguration(1);
+        Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        return () -> {
+            SchemaSpec schemaSpec = specGenerator.get();
+            return sharedConfiguration(seed, schemaSpec);
+        };
     }
 
-    private static AtomicInteger COUNTER = new AtomicInteger();
-
-    public static Configuration.ConfigurationBuilder sharedConfiguration(long seed)
-    {
-        int i = COUNTER.incrementAndGet();
-        SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + i, seed);
-        return sharedConfiguration(seed, schemaSpec);
-    }
-
-
     public static Configuration.ConfigurationBuilder sharedConfiguration(long seed, SchemaSpec schema)
     {
         return new Configuration.ConfigurationBuilder().setSeed(seed)
@@ -80,7 +75,6 @@ public class IntegrationTestBase extends TestBaseImpl
                                                        .setCreateSchema(true)
                                                        .setTruncateTable(false)
                                                        .setDropSchema(true)
-                                                       .setModel(new Configuration.ExhaustiveCheckerConfig())
                                                        .setSchemaProvider(seed1 -> schema)
                                                        .setClusteringDescriptorSelector((builder) -> {
                                                            builder
@@ -91,9 +85,9 @@ public class IntegrationTestBase extends TestBaseImpl
                                                                                     .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
                                                                                     .addWeight(OpSelectors.OperationKind.WRITE, 80)
                                                                                     .build())
-                                                           .setMaxPartitionSize(1);
+                                                           .setMaxPartitionSize(100);
                                                        })
                                                        .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, 200))
                                                        .setSUT(() -> sut);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/test/harry/model/ModelTest.java b/harry-integration/test/harry/model/ModelTest.java
index ae3d7b7..7a7cfe9 100644
--- a/harry-integration/test/harry/model/ModelTest.java
+++ b/harry-integration/test/harry/model/ModelTest.java
@@ -18,8 +18,10 @@
 
 package harry.model;
 
+import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,6 +31,7 @@ import harry.ddl.SchemaSpec;
 import harry.generators.Surjections;
 import harry.generators.distribution.Distribution;
 import harry.model.sut.InJvmSut;
+import harry.runner.LoggingPartitionVisitor;
 import harry.runner.Runner;
 import harry.util.BitSet;
 import org.apache.cassandra.distributed.Cluster;
@@ -80,7 +83,10 @@ public class ModelTest extends TestBaseImpl
     @Test
     public void statefulVisibleRowsCheckerTest() throws Throwable
     {
-        visibleRowsCheckerTest(VisibleRowsChecker::new);
+        visibleRowsCheckerTest(VisibleRowsChecker::new,
+                               (cfg) -> {
+                                   cfg.setDataTracker(VisibleRowsChecker.LoggingDataTracker::new);
+                               });
     }
 
     @Test
@@ -91,13 +97,20 @@ public class ModelTest extends TestBaseImpl
 
     public void visibleRowsCheckerTest(Model.ModelFactory factory) throws Throwable
     {
+        visibleRowsCheckerTest(factory, (a) -> {});
+    }
+    public void visibleRowsCheckerTest(Model.ModelFactory factory, Consumer<Configuration.ConfigurationBuilder> configurator) throws Throwable
+    {
         try (Cluster cluster = Cluster.create(3))
         {
-            Configuration.ConfigurationBuilder configuration = new Configuration.ConfigurationBuilder();
-            configuration.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(2),
+            Configuration.ConfigurationBuilder builder = new Configuration.ConfigurationBuilder();
+            builder.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(2),
                                                                                             1, TimeUnit.SECONDS))
                          .setRunTime(1, TimeUnit.MINUTES)
-                         .setRunner(new Configuration.ConcurrentRunnerConfig(2, 2, 2))
+                         .setRunner(new Configuration.ConcurrentRunnerConfig(2,
+                                                                             Arrays.asList(new Configuration.LoggingPartitionVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+                                                                                           new Configuration.RecentPartitionsValidatorConfiguration(10, 10, factory::make),
+                                                                                           new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))))
                          .setSchemaProvider((seed) -> schema)
                          .setClusteringDescriptorSelector((OpSelectors.Rng rng, SchemaSpec schemaSpec) -> {
                              return new DescriptorSelectorBuilder()
@@ -109,10 +122,9 @@ public class ModelTest extends TestBaseImpl
                          .setCreateSchema(true)
                          .setTruncateTable(false)
                          .setDropSchema(false)
-                         .setModel(factory::create)
                          .setSUT(() -> new InJvmSut(cluster));
-
-            Runner runner = configuration.build().createRunner();
+            configurator.accept(builder);
+            Runner runner = builder.build().createRunner();
             try
             {
                 runner.initAndStartAll().get(2, TimeUnit.MINUTES);
@@ -144,4 +156,4 @@ public class ModelTest extends TestBaseImpl
     }
 }
 
-// TODO: test things gradually. First with simple schemas, then with more complex, then with completely random.
\ No newline at end of file
+// TODO: test things gradually. First with simple schemas, then with more complex, then with completely random.
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
new file mode 100644
index 0000000..6421355
--- /dev/null
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -0,0 +1,102 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaGenerators;
+import harry.ddl.SchemaSpec;
+import harry.runner.LoggingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
+import harry.runner.PartitionVisitor;
+import harry.runner.SinglePartitionValidator;
+
+public abstract class ModelTestBase extends IntegrationTestBase
+{
+    void negativeTest(Function<Run, Boolean> corrupt, BiConsumer<Throwable, Run> validate)
+    {
+        Supplier<SchemaSpec> supplier = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
+        {
+            SchemaSpec schema = supplier.get();
+            negativeTest(corrupt, validate, i, schema);
+        }
+    }
+
+    abstract Configuration.ModelConfiguration modelConfiguration();
+
+    protected PartitionVisitor validator(Run run)
+    {
+        return new SinglePartitionValidator(100, run, modelConfiguration());
+    }
+
+    void negativeTest(Function<Run, Boolean> corrupt, BiConsumer<Throwable, Run> validate, int counter, SchemaSpec schemaSpec)
+    {
+        Configuration config = sharedConfiguration(counter, schemaSpec)
+                               .setClusteringDescriptorSelector((builder) -> {
+                                   builder.setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(10))
+                                          .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(10))
+                                          .setMaxPartitionSize(100)
+                                          .setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_RANGE, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_SLICE, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.WRITE, 96)
+                                                                   .build());
+                               })
+                               .build();
+
+        Run run = config.createRun();
+        beforeEach();
+        run.sut.schemaChange(run.schemaSpec.compile().cql());
+        OpSelectors.MonotonicClock clock = run.clock;
+
+        PartitionVisitor validator = validator(run);
+        PartitionVisitor partitionVisitor = new LoggingPartitionVisitor(run, MutatingRowVisitor::new);
+
+        for (int i = 0; i < 200; i++)
+        {
+            long lts = clock.nextLts();
+            partitionVisitor.visitPartition(lts);
+        }
+
+        validator.visitPartition(0);
+
+        if (!corrupt.apply(run))
+        {
+            System.out.println("Could not corrupt");
+            return;
+        }
+
+        try
+        {
+            validator.visitPartition(0);
+        }
+        catch (Throwable t)
+        {
+            validate.accept(t, run);
+        }
+    }
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 2520089..8085a06 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -37,17 +38,20 @@ import harry.corruptor.HideRowCorruptor;
 import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.ShowValueCorruptor;
+import harry.ddl.SchemaGenerators;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 
 @RunWith(Parameterized.class)
 public class QuerySelectorNegativeTest extends IntegrationTestBase
 {
-    private final int rounds = 10;
     private final int ltss = 1000;
+
     private final Random rnd = new Random();
 
     private final QueryResponseCorruptorFactory corruptorFactory;
@@ -86,20 +90,26 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
     public void selectRows()
     {
         Map<Query.QueryKind, Integer> stats = new HashMap<>();
+        Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
+
+        int rounds = SchemaGenerators.DEFAULT_RUNS;
         int failureCounter = 0;
         outer:
         for (int counter = 0; counter < rounds; counter++)
         {
             beforeEach();
-            Configuration config = sharedConfiguration(counter)
-                                   .setClusteringDescriptorSelector((builder) -> {
-                                       builder.setMaxPartitionSize(2000);
-                                   })
-                                   .build();
+            Configuration config = gen.get()
+                                      .setClusteringDescriptorSelector((builder) -> {
+                                          builder.setMaxPartitionSize(2000);
+                                      })
+                                      .build();
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
+
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = run.visitorFactory.get();
+
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+            Model model = new ExhaustiveChecker(run);
 
             QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
 
@@ -112,20 +122,23 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
             while (true)
             {
                 long verificationLts = rnd.nextInt(1000);
-                QuerySelector querySelector = new QuerySelector(run.schemaSpec,
-                                                                run.pdSelector,
-                                                                run.descriptorSelector,
-                                                                run.rng);
+                QueryGenerator queryGen = new QueryGenerator(run.schemaSpec,
+                                                             run.pdSelector,
+                                                             run.descriptorSelector,
+                                                             run.rng);
+
+                QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run.rng, queryGen);
 
                 Query query = querySelector.inflate(verificationLts, counter);
-                run.model.validatePartitionState(verificationLts, query);
+
+                model.validate(query);
 
                 if (!corruptor.maybeCorrupt(query, run.sut))
                     continue;
 
                 try
                 {
-                    run.model.validatePartitionState(verificationLts, query);
+                    model.validate(query);
                     Assert.fail("Should've failed");
                 }
                 catch (Throwable t)
@@ -141,4 +154,4 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
         Assert.assertTrue(String.format("Seen only %d failures", failureCounter),
                           failureCounter > (rounds * 0.8));
     }
-}
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 0859181..3e7f3f0 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -20,29 +20,37 @@ package harry.model;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import harry.core.Configuration;
 import harry.core.Run;
+import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 public class QuerySelectorTest extends IntegrationTestBase
 {
-    private static int CYCLES = 100;
+    private static int CYCLES = 300;
 
     @Test
     public void basicQuerySelectorTest()
     {
-        for (int cnt = 0; cnt < 50; cnt++)
+        Supplier<SchemaSpec> schemaGen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + cnt, cnt);
+            beforeEach();
+            SchemaSpec schemaSpec = schemaGen.get();
             int partitionSize = 200;
+
             int[] fractions = new int[schemaSpec.clusteringKeys.size()];
             int last = partitionSize;
             for (int i = fractions.length - 1; i >= 0; i--)
@@ -60,7 +68,8 @@ public class QuerySelectorTest extends IntegrationTestBase
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = run.visitorFactory.get();
+
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
             {
@@ -68,15 +77,13 @@ public class QuerySelectorTest extends IntegrationTestBase
                 partitionVisitor.visitPartition(lts);
             }
 
-            QuerySelector querySelector = new QuerySelector(run.schemaSpec,
-                                                            run.pdSelector,
-                                                            run.descriptorSelector,
-                                                            run.rng);
+            QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
 
             for (int i = 0; i < CYCLES; i++)
             {
                 Query query = querySelector.inflate(i, i);
-                Object[][] results = run.sut.execute(query.toSelectStatement());
+
+                Object[][] results = run.sut.execute(query.toSelectStatement(), SystemUnderTest.ConsistencyLevel.QUORUM);
                 Set<Long> matchingClusterings = new HashSet<>();
                 for (Object[] row : results)
                 {
@@ -89,7 +96,7 @@ public class QuerySelectorTest extends IntegrationTestBase
                 // the simplest test there can be: every row that is in the partition and was returned by the query,
                 // has to "match", every other row has to be a non-match
                 CompiledStatement selectPartition = SelectHelper.select(run.schemaSpec, run.pdSelector.pd(i));
-                Object[][] partition = run.sut.execute(selectPartition);
+                Object[][] partition = run.sut.execute(selectPartition, SystemUnderTest.ConsistencyLevel.QUORUM);
                 for (Object[] row : partition)
                 {
                     long cd = SelectHelper.resultSetToRow(run.schemaSpec,
@@ -106,9 +113,10 @@ public class QuerySelectorTest extends IntegrationTestBase
     @Test
     public void querySelectorModelTest()
     {
-        for (int cnt = 0; cnt < 50; cnt++)
+        Supplier<SchemaSpec> gen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + cnt, cnt);
+            SchemaSpec schemaSpec = gen.get();
             int[] fractions = new int[schemaSpec.clusteringKeys.size()];
             int partitionSize = 200;
             int last = partitionSize;
@@ -127,7 +135,7 @@ public class QuerySelectorTest extends IntegrationTestBase
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = run.visitorFactory.get();
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
             {
@@ -135,18 +143,14 @@ public class QuerySelectorTest extends IntegrationTestBase
                 partitionVisitor.visitPartition(lts);
             }
 
-            QuerySelector querySelector = new QuerySelector(run.schemaSpec,
-                                                            run.pdSelector,
-                                                            run.descriptorSelector,
-                                                            run.rng);
-
-            Model model = run.model;
+            QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
+            Model model = new ExhaustiveChecker(run);
 
             long verificationLts = 10;
             for (int i = 0; i < CYCLES; i++)
             {
                 Query query = querySelector.inflate(verificationLts, i);
-                model.validatePartitionState(verificationLts, query);
+                model.validate(query);
             }
         }
     }
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index c64f05d..22d7171 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -18,8 +18,6 @@
 
 package harry.model;
 
-import java.util.function.Consumer;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,19 +29,28 @@ import harry.corruptor.HideRowCorruptor;
 import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.Validator;
+import harry.runner.SinglePartitionValidator;
 
-public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
+public class QuiescentCheckerIntegrationTest extends ModelTestBase
 {
+    @Override
+    protected PartitionVisitor validator(Run run)
+    {
+        return new SinglePartitionValidator(100, run, modelConfiguration());
+    }
+
     @Test
     public void testNormalCondition()
     {
-        negativeTest((run) -> {},
-                     Assert::assertNull);
+        negativeTest((run) -> { return  true; },
+                     (t, run) -> {
+                         if (t != null)
+                             throw new AssertionError(String.format("Throwable was supposed to be null. Schema: %s",
+                                                                    run.schemaSpec.compile().cql()),
+                                                      t);
+                     });
     }
 
     @Test
@@ -54,15 +61,18 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideRowCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         Query query = Query.selectPartition(run.schemaSpec,
+                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                             false);
+
+                         return corruptor.maybeCorrupt(query, run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
+                         // TODO: We can actually pinpoint the difference
                          String expected = "Expected results to have the same number of results, but expected result iterator has more results";
+                         String expected2 = "Found a row in the model that is not present in the resultset";
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
-                                           t.getMessage().contains(expected));
+                                           t.getMessage().contains(expected) || t.getMessage().contains(expected2));
                      });
     }
 
@@ -74,15 +84,17 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                      run.clock,
                                                                                      run.descriptorSelector);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          String expected = "Found a row in the model that is not present in the resultset";
+                         String expected2 = "Expected results to have the same number of results, but actual result iterator has more results";
+
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
-                                           t.getMessage().contains(expected));
+                                           t.getMessage().contains(expected) || t.getMessage().contains(expected2));
                      });
     }
 
@@ -95,12 +107,12 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          String expected = "Returned row state doesn't match the one predicted by the model";
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
                                            t.getMessage().contains(expected));
@@ -116,55 +128,20 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    ChangeValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          String expected = "Returned row state doesn't match the one predicted by the model";
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
                                            t.getMessage().contains(expected));
                      });
     }
 
-
-    static void negativeTest(Consumer<Run> corrupt, Consumer<Throwable> validate)
+    Configuration.ModelConfiguration modelConfiguration()
     {
-        Configuration config = sharedConfiguration()
-                               .setModel(new Configuration.QuiescentCheckerConfig())
-                               .setClusteringDescriptorSelector((rng, schemaSpec) -> {
-                                   return new OpSelectors.DefaultDescriptorSelector(rng,
-                                                                                    new OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec.regularColumns.size()).build(),
-                                                                                    Surjections.pick(OpSelectors.OperationKind.DELETE_COLUMN,
-                                                                                                     OpSelectors.OperationKind.DELETE_ROW,
-                                                                                                     OpSelectors.OperationKind.WRITE),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    100);
-                               })
-                               .build();
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        for (int i = 0; i < 200; i++)
-        {
-            long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
-        }
-
-        corrupt.accept(run);
-
-        try
-        {
-            validator.validatePartition(0);
-        }
-        catch (Throwable t)
-        {
-            validate.accept(t);
-        }
+        return new Configuration.QuiescentCheckerConfig();
     }
 }
\ No newline at end of file
diff --git a/harry-integration/test/harry/op/RowVisitorTest.java b/harry-integration/test/harry/op/RowVisitorTest.java
index e58c05b..c8ad182 100644
--- a/harry-integration/test/harry/op/RowVisitorTest.java
+++ b/harry-integration/test/harry/op/RowVisitorTest.java
@@ -24,15 +24,19 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import harry.core.MetricReporter;
+import harry.core.Run;
 import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.generators.RandomGenerator;
 import harry.generators.distribution.Distribution;
-import harry.runner.DefaultRowVisitor;
+import harry.model.sut.SystemUnderTest;
+import harry.runner.DataTracker;
+import harry.runner.MutatingRowVisitor;
 import harry.model.clock.OffsetClock;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 import org.apache.cassandra.cql3.CQLTester;
 
@@ -50,32 +54,37 @@ public class RowVisitorTest extends CQLTester
     @Test
     public void rowWriteGeneratorTest()
     {
-        Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(5);
+        Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
         RandomGenerator rand = RandomGenerator.forTests(6371747244598697093L);
 
-        OpSelectors.Rng opRng = new OpSelectors.PCGFast(1);
-        OpSelectors.DefaultDescriptorSelector descriptorSelector = new OpSelectors.DefaultDescriptorSelector(new OpSelectors.PCGFast(1L),
-                                                                                                             OpSelectors.columnSelectorBuilder().forAll(BitSet.create(0b001, 3),
-                                                                                                                                                        BitSet.create(0b011, 3),
-                                                                                                                                                        BitSet.create(0b111, 3))
-                                                                                                                        .build(),
-                                                                                                             DEFAULT_OP_TYPE_SELECTOR,
-                                                                                                             new Distribution.ScaledDistribution(1, 3),
-                                                                                                             new Distribution.ScaledDistribution(2, 30),
-                                                                                                             100);
+        OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
 
-        for (int i = 0; i < SchemaGenerators.PROGRESSIVE_GENERATORS.length * 5; i++)
+        OpSelectors.PdSelector pdSelector = new OpSelectors.DefaultPdSelector(rng, 10, 10);
+        OpSelectors.DescriptorSelector descriptorSelector = new OpSelectors.DefaultDescriptorSelector(rng,
+                                                                                                      OpSelectors.columnSelectorBuilder().forAll(BitSet.create(0b001, 3),
+                                                                                                                                                 BitSet.create(0b011, 3),
+                                                                                                                                                 BitSet.create(0b111, 3))
+                                                                                                                 .build(),
+                                                                                                      DEFAULT_OP_TYPE_SELECTOR,
+                                                                                                      new Distribution.ScaledDistribution(1, 3),
+                                                                                                      new Distribution.ScaledDistribution(2, 30),
+                                                                                                      100);
+
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
         {
             SchemaSpec schema = specGenerator.get();
             createTable(schema.compile().cql());
 
-            DefaultRowVisitor visitor = new DefaultRowVisitor(schema,
-                                                              new OffsetClock(10000),
-                                                              descriptorSelector,
-                                                              new QuerySelector(schema,
-                                                                                new OpSelectors.DefaultPdSelector(opRng, 10, 10),
-                                                                                descriptorSelector,
-                                                                                opRng));
+            Run run = new Run(rng,
+                              new OffsetClock(10000),
+                              pdSelector,
+                              descriptorSelector,
+                              schema,
+                              DataTracker.NO_OP,
+                              SystemUnderTest.NO_OP,
+                              MetricReporter.NO_OP);
+
+            MutatingRowVisitor visitor = new MutatingRowVisitor(run);
             long[] descriptors = rand.next(4);
 
             execute(visitor.write(Math.abs(descriptors[0]),
diff --git a/pom.xml b/pom.xml
index efb1cf2..8a172eb 100755
--- a/pom.xml
+++ b/pom.xml
@@ -38,9 +38,9 @@
     <properties>
         <javac.target>1.8</javac.target>
         <harry.version>0.0.1-SNAPSHOT</harry.version>
-        <cassandra.version>4.0.1-SNAPSHOT</cassandra.version>
+        <cassandra.version>4.0.0-SNAPSHOT</cassandra.version>
         <jackson.version>2.11.3</jackson.version>
-        <dtest.version>0.0.6</dtest.version>
+        <dtest.version>0.0.7</dtest.version>
         <jmh.version>1.11.3</jmh.version>
         <argLine.common>
             -server

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org