You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/03/25 09:51:31 UTC

[cassandra-harry] 01/01: Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch CASSANDRA-16262
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git

commit 8238908162122dbad3eeb7857e1141f0b4205b95
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Wed Mar 24 16:11:57 2021 +0100

    Numerious minor improvements while preparing for fuzz-testing 4.0 in earnest:
    
      * Refactor Run to make it an entrypoint
      * Separate Partition visitors from Row visitors
      * Make it possible to effortlessly check local states
      * Introduce CLs
      * More clear distinction between the components allowing to implement visitors (such as repairing validator)
      * Implement fault injecting partition visitor
      * Extract DataTracker
      * Minor bug fixes
    
    Patch by Alex Petrov for CASSANDRA-16262.
---
 harry-core/src/harry/core/Configuration.java       | 384 +++++++++++++--------
 .../sut/NoOpSut.java => core/MetricReporter.java}  |  37 +-
 harry-core/src/harry/core/Run.java                 |  64 ++--
 .../src/harry/corruptor/AddExtraRowCorruptor.java  |   4 +-
 .../harry/corruptor/QueryResponseCorruptor.java    |   2 +-
 harry-core/src/harry/corruptor/RowCorruptor.java   |   2 +-
 harry-core/src/harry/ddl/SchemaGenerators.java     |  66 +++-
 harry-core/src/harry/ddl/SchemaSpec.java           |   6 +
 harry-core/src/harry/generators/PCGFastPure.java   |   2 +-
 harry-core/src/harry/model/DoNothingModel.java     |   6 +-
 harry-core/src/harry/model/ExhaustiveChecker.java  |  83 ++---
 harry-core/src/harry/model/Model.java              |  23 +-
 harry-core/src/harry/model/OpSelectors.java        |  24 +-
 harry-core/src/harry/model/QuiescentChecker.java   |  54 +--
 harry-core/src/harry/model/SelectHelper.java       |   2 +-
 .../harry/model/StatelessVisibleRowsChecker.java   |  36 +-
 harry-core/src/harry/model/VisibleRowsChecker.java | 113 +++---
 harry-core/src/harry/model/sut/PrintlnSut.java     |   9 +-
 .../src/harry/model/sut/SystemUnderTest.java       |  41 ++-
 harry-core/src/harry/reconciler/Reconciler.java    |  43 ++-
 .../src/harry/runner/AbstractPartitionVisitor.java |   3 +-
 .../src/harry/runner/AllPartitionsValidator.java   | 106 ++++++
 .../src/harry/runner/DataTracker.java              |  33 +-
 .../src/harry/runner/DefaultDataTracker.java       | 140 ++++++++
 .../runner/DefaultPartitionVisitorFactory.java     | 198 -----------
 harry-core/src/harry/runner/HarryRunner.java       |  36 +-
 .../src/harry/runner/LoggingPartitionVisitor.java  | 100 ++++++
 .../src/harry/runner/MutatingPartitionVisitor.java | 133 +++++++
 ...aultRowVisitor.java => MutatingRowVisitor.java} |  39 ++-
 harry-core/src/harry/runner/PartitionVisitor.java  |  11 +-
 harry-core/src/harry/runner/QueryGenerator.java    | 364 +++++++++++++++++++
 .../src/harry/runner/RecentPartitionValidator.java |  84 +++++
 harry-core/src/harry/runner/RowVisitor.java        |  13 +-
 harry-core/src/harry/runner/Runner.java            | 162 ++++-----
 .../src/harry/runner/SinglePartitionValidator.java |  56 +++
 harry-core/src/harry/runner/Validator.java         | 119 -------
 harry-core/src/harry/util/Ranges.java              |  25 +-
 .../test/harry/generators/RandomGeneratorTest.java |  32 +-
 .../test/harry/generators/SurjectionsTest.java     |   4 +-
 harry-core/test/harry/model/OpSelectorsTest.java   |  96 +++---
 harry-core/test/harry/operations/RelationTest.java | 215 ++++++------
 .../model/sut/external/ExternalClusterSut.java     |  24 +-
 .../harry/runner/external/HarryRunnerExternal.java |   8 +-
 .../src/harry/model/sut}/ExternalClusterSut.java   |  78 ++---
 .../src/harry/model/sut/InJvmSut.java              | 111 +++++-
 .../runner/FaultInjectingPartitionVisitor.java     |  94 +++++
 .../src/harry/runner/QueryingNoOpChecker.java      |  65 ++++
 .../harry/runner/RepairingLocalStateValidator.java | 137 ++++++++
 harry-integration/src/harry/runner/Reproduce.java  |   4 +-
 .../model/ExhaustiveCheckerIntegrationTest.java    | 211 +++++------
 .../harry/model/ExhaustiveCheckerUnitTest.java     | 100 +++---
 .../test/harry/model/IntegrationTestBase.java      |  28 +-
 harry-integration/test/harry/model/ModelTest.java  |  28 +-
 .../test/harry/model/ModelTestBase.java            | 102 ++++++
 .../harry/model/QuerySelectorNegativeTest.java     |  43 ++-
 .../test/harry/model/QuerySelectorTest.java        |  46 +--
 .../model/QuiescentCheckerIntegrationTest.java     | 111 +++---
 .../test/harry/op/RowVisitorTest.java              |  51 +--
 pom.xml                                            |   4 +-
 59 files changed, 2812 insertions(+), 1403 deletions(-)

diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index a3c287f..27ee176 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -27,7 +27,6 @@ import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -46,14 +45,16 @@ import harry.model.OpSelectors;
 import harry.model.QuiescentChecker;
 import harry.model.clock.ApproximateMonotonicClock;
 import harry.model.sut.SystemUnderTest;
-import harry.runner.DefaultPartitionVisitorFactory;
-import harry.runner.DefaultRowVisitor;
+import harry.runner.AllPartitionsValidator;
+import harry.runner.DataTracker;
+import harry.runner.DefaultDataTracker;
+import harry.runner.LoggingPartitionVisitor;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
-import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.RecentPartitionValidator;
 import harry.runner.RowVisitor;
 import harry.runner.Runner;
-import harry.runner.Validator;
 import harry.util.BitSet;
 
 public class Configuration
@@ -70,13 +71,22 @@ public class Configuration
         mapper.registerSubtypes(Configuration.DebugApproximateMonotonicClockConfiguration.class);
         mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
         mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
+        mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
+        mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
 
         mapper.registerSubtypes(Configuration.ExhaustiveCheckerConfig.class);
+        mapper.registerSubtypes(Configuration.QuiescentCheckerConfig.class);
         mapper.registerSubtypes(Configuration.DefaultCDSelectorConfiguration.class);
         mapper.registerSubtypes(Configuration.DefaultPDSelectorConfiguration.class);
         mapper.registerSubtypes(Configuration.ConstantDistributionConfig.class);
         mapper.registerSubtypes(DefaultSchemaProviderConfiguration.class);
-        mapper.registerSubtypes(DefaultRowVisitorConfiguration.class);
+        mapper.registerSubtypes(MutatingRowVisitorConfiguration.class);
+
+        mapper.registerSubtypes(MutatingPartitionVisitorConfiguation.class);
+        mapper.registerSubtypes(LoggingPartitionVisitorConfiguration.class);
+        mapper.registerSubtypes(AllPartitionsValidatorConfiguration.class);
+        mapper.registerSubtypes(RecentPartitionsValidatorConfiguration.class);
+        mapper.registerSubtypes(FixedSchemaProviderConfiguration.class);
     }
 
     public final long seed;
@@ -86,12 +96,11 @@ public class Configuration
     public final boolean create_schema;
     public final boolean truncate_table;
 
+    public final MetricReporterConfiguration metric_reporter;
     public final ClockConfiguration clock;
-    public final RunnerConfiguration runner;
     public final SutConfiguration system_under_test;
-    public final ModelConfiguration model;
-    public final RowVisitorConfiguration row_visitor;
-
+    public final DataTrackerConfiguration data_tracker;
+    public final RunnerConfiguration runner;
     public final PDSelectorConfiguration partition_descriptor_selector;
     public final CDSelectorConfiguration clustering_descriptor_selector;
 
@@ -104,11 +113,11 @@ public class Configuration
                          @JsonProperty("drop_schema") boolean drop_schema,
                          @JsonProperty("create_schema") boolean create_schema,
                          @JsonProperty("truncate_schema") boolean truncate_table,
+                         @JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
                          @JsonProperty("clock") ClockConfiguration clock,
                          @JsonProperty("runner") RunnerConfiguration runner,
                          @JsonProperty("system_under_test") SutConfiguration system_under_test,
-                         @JsonProperty("model") ModelConfiguration model,
-                         @JsonProperty("row_visitor") RowVisitorConfiguration row_visitor,
+                         @JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
                          @JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
                          @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
                          @JsonProperty(value = "run_time", defaultValue = "2") long run_time,
@@ -119,15 +128,15 @@ public class Configuration
         this.drop_schema = drop_schema;
         this.create_schema = create_schema;
         this.truncate_table = truncate_table;
+        this.metric_reporter = metric_reporter;
         this.clock = clock;
-        this.runner = runner;
         this.system_under_test = system_under_test;
-        this.model = model;
-        this.row_visitor = row_visitor;
+        this.data_tracker = data_tracker;
         this.partition_descriptor_selector = partition_descriptor_selector;
         this.clustering_descriptor_selector = clustering_descriptor_selector;
         this.run_time = run_time;
         this.run_time_unit = run_time_unit;
+        this.runner = runner;
     }
 
     public static void registerSubtypes(Class<?>... classes)
@@ -178,6 +187,13 @@ public class Configuration
 
     public static void validate(Configuration config)
     {
+        Objects.requireNonNull(config.schema_provider, "Schema provider should not be null");
+        Objects.requireNonNull(config.metric_reporter, "Metric reporter should not be null");
+        Objects.requireNonNull(config.clock, "Clock should not be null");
+        Objects.requireNonNull(config.system_under_test, "System under test should not be null");
+        Objects.requireNonNull(config.partition_descriptor_selector, "Partition descriptor selector should not be null");
+        Objects.requireNonNull(config.clustering_descriptor_selector, "Clustering descriptor selector should not be null");
+
         // TODO: validation
         //assert historySize * clockEpochTimeUnit.toMillis(clockEpoch) > runTimePeriod.toMillis(runTime) : "History size is too small for this run";
     }
@@ -194,53 +210,39 @@ public class Configuration
 
     public static Run createRun(Configuration snapshot)
     {
+        validate(snapshot);
+
         long seed = snapshot.seed;
 
+        DataTracker tracker = snapshot.data_tracker == null ? new DefaultDataTrackerConfiguration().make() : snapshot.data_tracker.make();
         OpSelectors.Rng rng = new OpSelectors.PCGFast(seed);
 
         OpSelectors.MonotonicClock clock = snapshot.clock.make();
 
-        // TODO: parsing schema
+        MetricReporter metricReporter = snapshot.metric_reporter.make();
+        // TODO: parse schema
         SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);
+        schemaSpec.validate();
 
         OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
         OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
 
         SystemUnderTest sut = snapshot.system_under_test.make();
-        QuerySelector querySelector = new QuerySelector(schemaSpec,
-                                                        pdSelector,
-                                                        descriptorSelector,
-                                                        Surjections.pick(Query.QueryKind.CLUSTERING_SLICE,
-                                                                         Query.QueryKind.CLUSTERING_RANGE),
-                                                        rng);
-        Model model = snapshot.model.create(schemaSpec, pdSelector, descriptorSelector, clock, querySelector, sut);
-        Validator validator = new Validator(model, schemaSpec, clock, pdSelector, descriptorSelector, rng);
-
-        RowVisitor rowVisitor;
-        if (snapshot.row_visitor != null)
-            rowVisitor = snapshot.row_visitor.make(schemaSpec, clock, descriptorSelector, querySelector);
-        else
-            rowVisitor = new DefaultRowVisitor(schemaSpec, clock, descriptorSelector, querySelector);
-
-        // TODO: make this one configurable, too?
-        Supplier<PartitionVisitor> visitorFactory = new DefaultPartitionVisitorFactory(model, sut, pdSelector, descriptorSelector, schemaSpec, rowVisitor);
+
         return new Run(rng,
                        clock,
                        pdSelector,
                        descriptorSelector,
                        schemaSpec,
-                       model,
+                       tracker,
                        sut,
-                       validator,
-                       rowVisitor,
-                       visitorFactory,
-                       snapshot);
+                       metricReporter);
     }
 
-    public static Runner createRunner(Configuration snapshot)
+    public static Runner createRunner(Configuration config)
     {
-        Run run = createRun(snapshot);
-        return snapshot.runner.make(run);
+        Run run = createRun(config);
+        return config.runner.make(run, config);
     }
 
     public static class ConfigurationBuilder
@@ -253,11 +255,10 @@ public class Configuration
         boolean truncate_table;
 
         ClockConfiguration clock;
+        MetricReporterConfiguration metric_reporter = new NoOpMetricReporterConfiguration();
+        DataTrackerConfiguration data_tracker = new DefaultDataTrackerConfiguration();
         RunnerConfiguration runner;
         SutConfiguration system_under_test;
-        ModelConfiguration model;
-        RowVisitorConfiguration row_visitor = new DefaultRowVisitorConfiguration();
-
         PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
         CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
 
@@ -283,13 +284,19 @@ public class Configuration
             return this;
         }
 
+
+        public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
+        {
+            this.data_tracker = tracker;
+            return this;
+        }
+
         public ConfigurationBuilder setClock(ClockConfiguration clock)
         {
             this.clock = clock;
             return this;
         }
 
-
         public ConfigurationBuilder setSUT(SutConfiguration system_under_test)
         {
             this.system_under_test = system_under_test;
@@ -314,12 +321,6 @@ public class Configuration
             return this;
         }
 
-        public ConfigurationBuilder setModel(ModelConfiguration model)
-        {
-            this.model = model;
-            return this;
-        }
-
         public ConfigurationBuilder setRunner(RunnerConfiguration runner)
         {
             this.runner = runner;
@@ -345,28 +346,28 @@ public class Configuration
             return setClusteringDescriptorSelector(builder.build());
         }
 
-        public ConfigurationBuilder setRowVisitor(RowVisitorConfiguration row_visitor)
+        public ConfigurationBuilder setMetricReporter(MetricReporterConfiguration metric_reporter)
         {
-            this.row_visitor = row_visitor;
+            this.metric_reporter = metric_reporter;
             return this;
         }
 
         public Configuration build()
         {
             return new Configuration(seed,
-                                     Objects.requireNonNull(schema_provider, "Schema provider should not be null"),
+                                     schema_provider,
                                      drop_schema,
                                      create_schema,
                                      truncate_table,
+                                     metric_reporter,
+                                     clock,
 
-                                     Objects.requireNonNull(clock, "Clock should not be null"),
                                      runner,
-                                     Objects.requireNonNull(system_under_test, "System under test should not be null"),
-                                     Objects.requireNonNull(model, "Model should not be null"),
-                                     Objects.requireNonNull(row_visitor, "Row visitor should not be null"),
+                                     system_under_test,
+                                     data_tracker,
 
-                                     Objects.requireNonNull(partition_descriptor_selector, "Partition descriptor selector should not be null"),
-                                     Objects.requireNonNull(clustering_descriptor_selector, "Clustering descriptor selector should not be null"),
+                                     partition_descriptor_selector,
+                                     clustering_descriptor_selector,
 
                                      run_time,
                                      run_time_unit);
@@ -385,8 +386,6 @@ public class Configuration
         builder.clock = clock;
         builder.runner = runner;
         builder.system_under_test = system_under_test;
-        builder.model = model;
-        builder.row_visitor = row_visitor;
 
         builder.partition_descriptor_selector = partition_descriptor_selector;
         builder.clustering_descriptor_selector = clustering_descriptor_selector;
@@ -396,6 +395,40 @@ public class Configuration
         return builder;
     }
 
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+    public interface DataTrackerConfiguration extends DataTracker.DataTrackerFactory
+    {
+
+    }
+
+    @JsonTypeName("default")
+    public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
+    {
+        public final long max_seen_lts;
+        public final long max_complete_lts;
+
+        public DefaultDataTrackerConfiguration()
+        {
+            this(-1, -1);
+        }
+
+        @JsonCreator
+        public DefaultDataTrackerConfiguration(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
+                                               @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
+        {
+            this.max_seen_lts = max_seen_lts;
+            this.max_complete_lts = max_complete_lts;
+        }
+
+        public DataTracker make()
+        {
+            DefaultDataTracker defaultDataTracker = new DefaultDataTracker();
+            defaultDataTracker.forceLts(max_seen_lts, max_complete_lts);
+            return defaultDataTracker;
+        }
+    }
+
     @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
     public interface ClockConfiguration extends OpSelectors.MonotonicClockFactory
     {
@@ -475,46 +508,39 @@ public class Configuration
     @JsonTypeName("concurrent")
     public static class ConcurrentRunnerConfig implements RunnerConfiguration
     {
-        public final int writer_threads;
-        public final int round_robin_validator_threads;
-        public final int recent_partition_validator_threads;
+        public final int concurrency;
+        public final List<PartitionVisitorConfiguration> partition_visitor_factories;
 
         @JsonCreator
-        public ConcurrentRunnerConfig(@JsonProperty(value = "writer_threads", defaultValue = "2") int writer_threads,
-                                      @JsonProperty(value = "round_robin_validator_threads", defaultValue = "2") int round_robin_validator_threads,
-                                      @JsonProperty(value = "recent_partition_validator_threads", defaultValue = "2") int recent_partition_validator_threads)
+        public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
+                                      @JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
         {
-            this.writer_threads = writer_threads;
-            this.round_robin_validator_threads = round_robin_validator_threads;
-            this.recent_partition_validator_threads = recent_partition_validator_threads;
+            this.concurrency = concurrency;
+            this.partition_visitor_factories = partitionVisitors;
         }
 
-        public Runner make(Run run)
+        @Override
+        public Runner make(Run run, Configuration config)
         {
-            return new Runner.ConcurrentRunner(run, writer_threads, round_robin_validator_threads, recent_partition_validator_threads);
+            return new Runner.ConcurrentRunner(run, config, concurrency, partition_visitor_factories);
         }
     }
 
     @JsonTypeName("sequential")
     public static class SequentialRunnerConfig implements RunnerConfiguration
     {
-        private final int round_robin_validator_threads;
-        private final int check_recent_after;
-        private final int check_all_after;
+        public final List<PartitionVisitorConfiguration> partition_visitor_factories;
 
         @JsonCreator
-        public SequentialRunnerConfig(@JsonProperty(value = "round_robin_validator_threads", defaultValue = "2") int round_robin_validator_threads,
-                                      @JsonProperty(value = "check_recent_after", defaultValue = "100") int check_recent_after,
-                                      @JsonProperty(value = "check_all_after", defaultValue = "5000") int check_all_after)
+        public SequentialRunnerConfig(@JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
         {
-            this.round_robin_validator_threads = round_robin_validator_threads;
-            this.check_recent_after = check_recent_after;
-            this.check_all_after = check_all_after;
+            this.partition_visitor_factories = partitionVisitors;
         }
 
-        public Runner make(Run run)
+        @Override
+        public Runner make(Run run, Configuration config)
         {
-            return new Runner.SequentialRunner(run, round_robin_validator_threads, check_recent_after, check_all_after);
+            return new Runner.SequentialRunner(run, config, partition_visitor_factories);
         }
     }
 
@@ -531,64 +557,29 @@ public class Configuration
     @JsonTypeName("exhaustive_checker")
     public static class ExhaustiveCheckerConfig implements ModelConfiguration
     {
-        public final long max_seen_lts;
-        public final long max_complete_lts;
-
+        @JsonCreator
         public ExhaustiveCheckerConfig()
         {
-            this(-1, -1);
-        }
 
-        @JsonCreator
-        public ExhaustiveCheckerConfig(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
-                                       @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
-        {
-            this.max_seen_lts = max_seen_lts;
-            this.max_complete_lts = max_complete_lts;
         }
 
-        public Model create(SchemaSpec schema, OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, OpSelectors.MonotonicClock clock, QuerySelector querySelector, SystemUnderTest sut)
+        public Model make(Run run)
         {
-            ExhaustiveChecker exhaustiveChecker = new ExhaustiveChecker(schema,
-                                                                        pdSelector,
-                                                                        descriptorSelector,
-                                                                        clock,
-                                                                        querySelector,
-                                                                        sut);
-            exhaustiveChecker.forceLts(max_seen_lts, max_complete_lts);
-            return exhaustiveChecker;
+            return new ExhaustiveChecker(run);
         }
     }
 
     @JsonTypeName("quiescent_checker")
     public static class QuiescentCheckerConfig implements ModelConfiguration
     {
-        public final long max_seen_lts;
-        public final long max_complete_lts;
-
-        public QuiescentCheckerConfig()
-        {
-            this(-1, -1);
-        }
-
         @JsonCreator
-        public QuiescentCheckerConfig(@JsonProperty(value = "max_seen_lts", defaultValue = "-1") long max_seen_lts,
-                                      @JsonProperty(value = "max_complete_lts", defaultValue = "-1") long max_complete_lts)
+        public QuiescentCheckerConfig()
         {
-            this.max_seen_lts = max_seen_lts;
-            this.max_complete_lts = max_complete_lts;
         }
 
-        public Model create(SchemaSpec schema, OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, OpSelectors.MonotonicClock clock, QuerySelector querySelector, SystemUnderTest sut)
+        public Model make(Run run)
         {
-            QuiescentChecker exhaustiveChecker = new QuiescentChecker(schema,
-                                                                      pdSelector,
-                                                                      descriptorSelector,
-                                                                      clock,
-                                                                      querySelector,
-                                                                      sut);
-            exhaustiveChecker.forceLts(max_seen_lts, max_complete_lts);
-            return exhaustiveChecker;
+            return new QuiescentChecker(run);
         }
     }
 
@@ -875,23 +866,108 @@ public class Configuration
         }
     }
 
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+    public interface PartitionVisitorConfiguration extends PartitionVisitor.PartitionVisitorFactory
+    {
+    }
+
+
+    @JsonTypeName("mutating")
+    public static class MutatingPartitionVisitorConfiguation implements PartitionVisitorConfiguration
+    {
+        protected final RowVisitorConfiguration row_visitor;
+
+        @JsonCreator
+        public MutatingPartitionVisitorConfiguation(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+        {
+            this.row_visitor = row_visitor;
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new MutatingPartitionVisitor(run, row_visitor);
+        }
+    }
+
+    @JsonTypeName("logging")
+    public static class LoggingPartitionVisitorConfiguration implements PartitionVisitorConfiguration
+    {
+        protected final RowVisitorConfiguration row_visitor;
+
+        @JsonCreator
+        public LoggingPartitionVisitorConfiguration(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+        {
+            this.row_visitor = row_visitor;
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new LoggingPartitionVisitor(run, row_visitor);
+        }
+    }
+
+    @JsonTypeName("validate_all_partitions")
+    public static class AllPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    {
+        private final int concurrency;
+        private final int trigger_after;
+        private final Configuration.ModelConfiguration modelConfiguration;
+
+        @JsonCreator
+        public AllPartitionsValidatorConfiguration(@JsonProperty("concurrency") int concurrency,
+                                                   @JsonProperty("trigger_after") int trigger_after,
+                                                   @JsonProperty("model") Configuration.ModelConfiguration model)
+        {
+            this.concurrency = concurrency;
+            this.trigger_after = trigger_after;
+            this.modelConfiguration = model;
+        }
+
+        public PartitionVisitor make(Run run)
+        {
+            return new AllPartitionsValidator(concurrency, trigger_after, run, modelConfiguration);
+        }
+    }
+
+    @JsonTypeName("validate_recent_partitions")
+    public static class RecentPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    {
+        private final int partition_count;
+        private final int trigger_after;
+        private final Configuration.ModelConfiguration modelConfiguration;
+
+        @JsonCreator
+        public RecentPartitionsValidatorConfiguration(@JsonProperty("partition_count") int partition_count,
+                                                      @JsonProperty("trigger_after") int trigger_after,
+                                                      @JsonProperty("model") Configuration.ModelConfiguration model)
+        {
+            this.partition_count = partition_count;
+            this.trigger_after = trigger_after;
+            this.modelConfiguration = model;
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new RecentPartitionValidator(partition_count, trigger_after, run, modelConfiguration);
+        }
+    }
+
     @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
     public interface RowVisitorConfiguration extends RowVisitor.RowVisitorFactory
     {
     }
 
-    @JsonTypeName("default")
-    public static class DefaultRowVisitorConfiguration implements RowVisitorConfiguration
+    @JsonTypeName("mutating")
+    public static class MutatingRowVisitorConfiguration implements RowVisitorConfiguration
     {
-        public RowVisitor make(SchemaSpec schema,
-                               OpSelectors.MonotonicClock clock,
-                               OpSelectors.DescriptorSelector descriptorSelector,
-                               QuerySelector querySelector)
+        @Override
+        public RowVisitor make(Run run)
         {
-            return new DefaultRowVisitor(schema,
-                                         clock,
-                                         descriptorSelector,
-                                         querySelector);
+            return new MutatingRowVisitor(run);
         }
     }
 
@@ -910,5 +986,41 @@ public class Configuration
         }
     }
 
+    @JsonTypeName("fixed")
+    public static class FixedSchemaProviderConfiguration implements SchemaProviderConfiguration
+    {
+        private final SchemaSpec schemaSpec;
+
+        @JsonCreator
+        public FixedSchemaProviderConfiguration(@JsonProperty("keyspace") String keyspace,
+                                                @JsonProperty("table") String table,
+                                                @JsonProperty("partition_keys") Map<String, String> pks,
+                                                @JsonProperty("clustering_keys") Map<String, String> cks,
+                                                @JsonProperty("regular_columns") Map<String, String> regulars)
+        {
+            this.schemaSpec = SchemaGenerators.parse(keyspace, table,
+                                                     pks, cks, regulars);
+        }
+
+        public SchemaSpec make(long seed)
+        {
+            return schemaSpec;
+        }
+    }
+
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
+    public interface MetricReporterConfiguration extends MetricReporter.MetricReporterFactory
+    {
+    }
+
+    @JsonTypeName("default")
+    public static class NoOpMetricReporterConfiguration implements MetricReporterConfiguration
+    {
+        public MetricReporter make()
+        {
+            return MetricReporter.NO_OP;
+        }
+    }
+
     // TODO: schema provider by DDL
 }
diff --git a/harry-core/src/harry/model/sut/NoOpSut.java b/harry-core/src/harry/core/MetricReporter.java
similarity index 58%
rename from harry-core/src/harry/model/sut/NoOpSut.java
rename to harry-core/src/harry/core/MetricReporter.java
index c09490f..b576ec3 100644
--- a/harry-core/src/harry/model/sut/NoOpSut.java
+++ b/harry-core/src/harry/core/MetricReporter.java
@@ -16,29 +16,34 @@
  *  limitations under the License.
  */
 
-package harry.model.sut;
+package harry.core;
 
-import java.util.concurrent.CompletableFuture;
-
-public class NoOpSut implements SystemUnderTest
+public interface MetricReporter
 {
-    public boolean isShutdown()
-    {
-        return false;
-    }
+    void columnDelete();
+    void rowDelete();
+    void insert();
+    void rangeDelete();
 
-    public void shutdown()
-    {
-    }
+    void validatePartition();
+    void validateRandomQuery();
 
-    public Object[][] execute(String statement, Object... bindings)
+    interface MetricReporterFactory
     {
-        return new Object[0][];
+        MetricReporter make();
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    MetricReporter NO_OP = new NoOpMetricReporter();
+
+    class NoOpMetricReporter implements MetricReporter
     {
-        return CompletableFuture.supplyAsync(() -> execute(statement, bindings),
-                                             Runnable::run);
+        private NoOpMetricReporter() {}
+
+        public void columnDelete(){}
+        public void rowDelete(){}
+        public void insert(){}
+        public void rangeDelete(){}
+        public void validatePartition(){}
+        public void validateRandomQuery(){}
     }
 }
diff --git a/harry-core/src/harry/core/Run.java b/harry-core/src/harry/core/Run.java
index fdff08e..d31e7ad 100644
--- a/harry-core/src/harry/core/Run.java
+++ b/harry-core/src/harry/core/Run.java
@@ -18,56 +18,62 @@
 
 package harry.core;
 
-import java.util.function.Supplier;
-
 import harry.ddl.SchemaSpec;
-import harry.model.Model;
 import harry.model.OpSelectors;
 import harry.model.sut.SystemUnderTest;
-import harry.runner.PartitionVisitor;
-import harry.runner.RowVisitor;
-import harry.runner.Validator;
+import harry.runner.DataTracker;
+import harry.runner.QueryGenerator;
 
 public class Run
 {
     public final OpSelectors.Rng rng;
     public final OpSelectors.MonotonicClock clock;
     public final OpSelectors.PdSelector pdSelector;
-
     public final OpSelectors.DescriptorSelector descriptorSelector;
+    public final QueryGenerator rangeSelector;
 
     public final SchemaSpec schemaSpec;
-    public final Model model;
+    public final DataTracker tracker;
     public final SystemUnderTest sut;
-    public final Validator validator;
-    public final RowVisitor rowVisitor;
-    public final Supplier<PartitionVisitor> visitorFactory;
 
-    public final Configuration snapshot;
+    public final MetricReporter metricReporter;
+
+
+    public Run(OpSelectors.Rng rng,
+               OpSelectors.MonotonicClock clock,
+               OpSelectors.PdSelector pdSelector,
+               OpSelectors.DescriptorSelector descriptorSelector,
 
-    Run(OpSelectors.Rng rng,
-        OpSelectors.MonotonicClock clock,
-        OpSelectors.PdSelector pdSelector,
-        OpSelectors.DescriptorSelector descriptorSelector,
+               SchemaSpec schemaSpec,
+               DataTracker tracker,
+               SystemUnderTest sut,
+               MetricReporter metricReporter)
+    {
+        this(rng, clock, pdSelector, descriptorSelector,
+             new QueryGenerator(schemaSpec, pdSelector, descriptorSelector, rng),
+             schemaSpec, tracker, sut, metricReporter);
+    }
 
-        SchemaSpec schemaSpec,
-        Model model,
-        SystemUnderTest sut,
-        Validator validator,
-        RowVisitor rowVisitor,
-        Supplier<PartitionVisitor> visitorFactory,
-        Configuration snapshot)
+    private Run(OpSelectors.Rng rng,
+                OpSelectors.MonotonicClock clock,
+                OpSelectors.PdSelector pdSelector,
+                OpSelectors.DescriptorSelector descriptorSelector,
+                QueryGenerator rangeSelector,
+
+                SchemaSpec schemaSpec,
+                DataTracker tracker,
+                SystemUnderTest sut,
+                MetricReporter metricReporter)
     {
+
         this.rng = rng;
         this.clock = clock;
         this.pdSelector = pdSelector;
         this.descriptorSelector = descriptorSelector;
+        this.rangeSelector = rangeSelector;
         this.schemaSpec = schemaSpec;
-        this.model = model;
+        this.tracker = tracker;
         this.sut = sut;
-        this.validator = validator;
-        this.rowVisitor = rowVisitor;
-        this.visitorFactory = visitorFactory;
-        this.snapshot = snapshot;
+        this.metricReporter = metricReporter;
     }
-}
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
index 9a73867..f2a8b28 100644
--- a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
@@ -53,7 +53,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
     {
         Set<Long> cds = new HashSet<>();
         long maxLts = 0;
-        for (Object[] obj : sut.execute(query.toSelectStatement()))
+        for (Object[] obj : sut.execute(query.toSelectStatement(), SystemUnderTest.ConsistencyLevel.ALL))
         {
             ResultSetRow row = SelectHelper.resultSetToRow(schema, clock, obj);
             // TODO: extract CD cheaper
@@ -84,7 +84,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
         // written value and tombstone are resolved in favour of tombstone, so we're
         // just going to take the next lts.
         logger.info("Corrupting the resultset by writing a row with cd {}", cd);
-        sut.execute(WriteHelper.inflateInsert(schema, query.pd, cd, vds, clock.rts(maxLts) + 1));
+        sut.execute(WriteHelper.inflateInsert(schema, query.pd, cd, vds, clock.rts(maxLts) + 1), SystemUnderTest.ConsistencyLevel.ALL);
         return true;
     }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
index aefce0f..301c50b 100644
--- a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
+++ b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java
@@ -52,7 +52,7 @@ public interface QueryResponseCorruptor
         {
             List<ResultSetRow> result = new ArrayList<>();
             CompiledStatement statement = query.toSelectStatement();
-            for (Object[] obj : sut.execute(statement.cql(), statement.bindings()))
+            for (Object[] obj : sut.execute(statement.cql(), SystemUnderTest.ConsistencyLevel.ALL, statement.bindings()))
                 result.add(SelectHelper.resultSetToRow(schema, clock, obj));
 
             // TODO: technically, we can do this just depends on corruption strategy
diff --git a/harry-core/src/harry/corruptor/RowCorruptor.java b/harry-core/src/harry/corruptor/RowCorruptor.java
index 85261b2..fbd5fa9 100644
--- a/harry-core/src/harry/corruptor/RowCorruptor.java
+++ b/harry-core/src/harry/corruptor/RowCorruptor.java
@@ -36,7 +36,7 @@ public interface RowCorruptor
         if (canCorrupt(row))
         {
             CompiledStatement statement = corrupt(row);
-            sut.execute(statement.cql(), statement.bindings());
+            sut.execute(statement.cql(), SystemUnderTest.ConsistencyLevel.ALL, statement.bindings());
             return true;
         }
         return false;
diff --git a/harry-core/src/harry/ddl/SchemaGenerators.java b/harry-core/src/harry/ddl/SchemaGenerators.java
index aecc189..53ab6f5 100644
--- a/harry-core/src/harry/ddl/SchemaGenerators.java
+++ b/harry-core/src/harry/ddl/SchemaGenerators.java
@@ -18,15 +18,19 @@
 
 package harry.ddl;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import harry.generators.Generator;
 import harry.generators.Surjections;
 
@@ -40,10 +44,12 @@ public class SchemaGenerators
     }
 
     public static final Collection<ColumnSpec.DataType<?>> clusteringKeyTypes;
+    public static final Map<String, ColumnSpec.DataType<?>> nameToTypeMap;
     public static final Collection<ColumnSpec.DataType<?>> columnTypes;
 
     static
     {
+
         ImmutableList.Builder<ColumnSpec.DataType<?>> builder = ImmutableList.builder();
         builder.add(ColumnSpec.int8Type,
                     ColumnSpec.int16Type,
@@ -51,17 +57,27 @@ public class SchemaGenerators
                     ColumnSpec.int64Type,
 // TODO re-enable boolean type; add it to ByteBufferUtil in Cassandra for that
 //                    ColumnSpec.booleanType,
-                    ColumnSpec.floatType,
-                    ColumnSpec.doubleType,
                     ColumnSpec.asciiType);
         columnTypes = builder.build();
         builder = ImmutableList.builder();
         builder.addAll(columnTypes);
+
+        ImmutableMap.Builder<String, ColumnSpec.DataType<?>> mapBuilder = ImmutableMap.builder();
+
         for (ColumnSpec.DataType<?> columnType : columnTypes)
         {
-            builder.add(ColumnSpec.ReversedType.getInstance(columnType));
+            ColumnSpec.DataType<?> reversedType = ColumnSpec.ReversedType.getInstance(columnType);
+            builder.add(reversedType);
+
+            mapBuilder.put(columnType.toString(), columnType);
+            mapBuilder.put(String.format("desc(%s)", columnType.toString()), columnType);
         }
+
+        builder.add(ColumnSpec.floatType);
+        builder.add(ColumnSpec.doubleType);
+
         clusteringKeyTypes = builder.build();
+        nameToTypeMap = mapBuilder.build();
     }
 
     @SuppressWarnings("unchecked")
@@ -281,7 +297,7 @@ public class SchemaGenerators
     public static Surjections.Surjection<SchemaSpec> defaultSchemaSpecGen(String ks, String table)
     {
         return new SchemaGenerators.Builder(ks, () -> table)
-               .partitionKeySpec(1, 4,
+               .partitionKeySpec(2, 4,
 //                                                                             ColumnSpec.int8Type,
 //                                                                             ColumnSpec.int16Type,
                                  ColumnSpec.int32Type,
@@ -289,7 +305,7 @@ public class SchemaGenerators
 //                                                                             ColumnSpec.floatType,
 //                                                                             ColumnSpec.doubleType,
                                  ColumnSpec.asciiType(4, 10))
-               .clusteringKeySpec(1, 4,
+               .clusteringKeySpec(2, 4,
 //                                                                              ColumnSpec.int8Type,
 //                                                                              ColumnSpec.int16Type,
                                   ColumnSpec.int32Type,
@@ -366,6 +382,7 @@ public class SchemaGenerators
     longAndStringSpecWithReversedBothBuilder,
     withAllFeaturesEnabled
     };
+
     // Create schema generators that would produce tables starting with just a few features, progressing to use more
     public static Supplier<SchemaSpec> progression(int switchAfter)
     {
@@ -375,10 +392,11 @@ public class SchemaGenerators
 
         return new Supplier<SchemaSpec>()
         {
-            private final AtomicInteger counter = new AtomicInteger();
+            private int counter = 0;
             public SchemaSpec get()
             {
-                int idx = (counter.getAndIncrement() / switchAfter) % generators.length;
+                int idx = (counter / switchAfter) % generators.length;
+                counter++;
                 SchemaSpec spec = generators[idx].get();
                 int tries = 100;
                 while ((spec.ckGenerator.byteSize() != Long.BYTES || spec.pkGenerator.byteSize() != Long.BYTES) && tries > 0)
@@ -388,6 +406,8 @@ public class SchemaGenerators
                     tries--;
                 }
 
+                spec.validate();
+
                 assert tries > 0 : String.format("Max number of tries exceeded on generator %d, can't generate a needed schema", idx);
                 return spec;
             }
@@ -396,12 +416,34 @@ public class SchemaGenerators
         };
     }
 
-    public static int DEFAULT_SWITCH_AFTER = 5;
-    public static int GENERATORS_COUNT = PROGRESSIVE_GENERATORS.length;
-    public static int DEFAULT_RUNS = DEFAULT_SWITCH_AFTER * PROGRESSIVE_GENERATORS.length;
+    public static List<ColumnSpec<?>> toColumns(Map<String, String> config, ColumnSpec.Kind kind, boolean allowReverse)
+    {
+        List<ColumnSpec<?>> columns = new ArrayList<>(config.size());
+
+        for (Map.Entry<String, String> e : config.entrySet())
+        {
+            ColumnSpec.DataType<?> type = nameToTypeMap.get(e.getValue());
+            assert type != null : "Can't parse the type";
+            assert allowReverse || !type.isReversed() : String.format("%s columns aren't allowed to be reversed");
+            columns.add(new ColumnSpec<>(e.getKey(), type, kind));
+        }
 
-    public static Supplier<SchemaSpec> progression()
+        return columns;
+    }
+
+    public static SchemaSpec parse(String keyspace,
+                                   String table,
+                                   Map<String, String> pks,
+                                   Map<String, String> cks,
+                                   Map<String, String> regulars)
     {
-        return progression(DEFAULT_SWITCH_AFTER); // would generate 30 tables before wrapping around
+        return new SchemaSpec(keyspace, table,
+                              toColumns(pks, ColumnSpec.Kind.PARTITION_KEY, false),
+                              toColumns(cks, ColumnSpec.Kind.CLUSTERING, false),
+                              toColumns(regulars, ColumnSpec.Kind.REGULAR, false));
     }
+
+    public static int DEFAULT_SWITCH_AFTER = Integer.getInteger("harry.test.progression.switch-after", 5);
+    public static int GENERATORS_COUNT = PROGRESSIVE_GENERATORS.length;
+    public static int DEFAULT_RUNS = DEFAULT_SWITCH_AFTER * GENERATORS_COUNT;
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java
index 336bd67..11b7009 100644
--- a/harry-core/src/harry/ddl/SchemaSpec.java
+++ b/harry-core/src/harry/ddl/SchemaSpec.java
@@ -96,6 +96,12 @@ public class SchemaSpec
         this.ALL_COLUMNS_BITSET = BitSet.allSet(regularColumns.size());
     }
 
+    public void validate()
+    {
+        assert pkGenerator.byteSize() == Long.BYTES : partitionKeys.toString();
+        assert ckGenerator.byteSize() == Long.BYTES : clusteringKeys.toString();
+    }
+
     public static interface AddRelationCallback
     {
         public void accept(ColumnSpec spec, Relation.RelationKind kind, Object value);
diff --git a/harry-core/src/harry/generators/PCGFastPure.java b/harry-core/src/harry/generators/PCGFastPure.java
index 48bfae3..9e438dc 100644
--- a/harry-core/src/harry/generators/PCGFastPure.java
+++ b/harry-core/src/harry/generators/PCGFastPure.java
@@ -105,7 +105,7 @@ public class PCGFastPure
             curMult *= curMult;
         }
 
-        return distance - 1;
+        return distance;
     }
 
     public static long shuffle(long state)
diff --git a/harry-core/src/harry/model/DoNothingModel.java b/harry-core/src/harry/model/DoNothingModel.java
index c21392d..35b5f86 100644
--- a/harry-core/src/harry/model/DoNothingModel.java
+++ b/harry-core/src/harry/model/DoNothingModel.java
@@ -23,11 +23,7 @@ import harry.runner.Query;
 
 public class DoNothingModel implements Model
 {
-    public void recordEvent(long lts, boolean quorumAchieved)
-    {
-    }
-
-    public void validatePartitionState(long verificationLts, Query query)
+    public void validate(Query query)
     {
     }
 
diff --git a/harry-core/src/harry/model/ExhaustiveChecker.java b/harry-core/src/harry/model/ExhaustiveChecker.java
index 935da45..b98ef06 100644
--- a/harry-core/src/harry/model/ExhaustiveChecker.java
+++ b/harry-core/src/harry/model/ExhaustiveChecker.java
@@ -30,20 +30,21 @@ import java.util.TreeMap;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.runner.AbstractPartitionVisitor;
+import harry.runner.DataTracker;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
 
@@ -61,42 +62,26 @@ public class ExhaustiveChecker implements Model
     protected final OpSelectors.PdSelector pdSelector;
     protected final OpSelectors.MonotonicClock clock;
     protected final SystemUnderTest sut;
-    protected final QuerySelector querySelector;
+    protected final QueryGenerator rangeSelector;
 
     protected final DataTracker tracker;
 
     private final SchemaSpec schema;
 
-    public ExhaustiveChecker(SchemaSpec schema,
-                             OpSelectors.PdSelector pdSelector,
-                             OpSelectors.DescriptorSelector descriptorSelector,
-                             OpSelectors.MonotonicClock clock,
-                             QuerySelector querySelector,
-                             SystemUnderTest sut)
+    public ExhaustiveChecker(Run run)
     {
-        this.descriptorSelector = descriptorSelector;
-        this.pdSelector = pdSelector;
-        this.tracker = new DataTracker();
-        this.schema = schema;
-        this.sut = sut;
-        this.clock = clock;
-        this.querySelector = querySelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.pdSelector = run.pdSelector;
+        this.tracker = run.tracker;
+        this.schema = run.schemaSpec;
+        this.sut = run.sut;
+        this.clock = run.clock;
+        this.rangeSelector = run.rangeSelector;
     }
 
-    public void recordEvent(long lts, boolean quorumAchieved)
+    public void validate(Query query)
     {
-        tracker.recordEvent(lts, quorumAchieved);
-    }
-
-    public DataTracker tracker()
-    {
-        return tracker;
-    }
-
-    public void validatePartitionState(long validationLts, Query query)
-    {
-        validatePartitionState(validationLts,
-                               query,
+        validatePartitionState(query,
                                () -> {
                                    while (!Thread.currentThread().isInterrupted())
                                    {
@@ -114,18 +99,18 @@ public class ExhaustiveChecker implements Model
                                });
     }
 
-    void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+    void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
     {
         // Before we execute SELECT, we know what was the lts of operation that is guaranteed to be visible
-        long visibleLtsBound = tracker.maxCompleteLts();
+        long visibleLtsBound = tracker.maxConsecutiveFinished();
 
         // TODO: when we implement a reorder buffer through a bitmap, we can just grab a bitmap _before_,
         //       and know that a combination of `consecutive` + `bitmap` gives us _all possible guaranteed-to-be-seen_ values
         List<ResultSetRow> rows = rowsSupplier.get();
 
         // by the time SELECT done, we grab max "possible" lts
-        long inFlightLtsBound = tracker.maxSeenLts();
-        PartitionState partitionState = inflatePartitionState(validationLts, inFlightLtsBound, query);
+        long maxSeenLts = tracker.maxStarted();
+        PartitionState partitionState = inflatePartitionState(maxSeenLts, query);
         NavigableMap<Long, List<Operation>> operations = partitionState.operations;
         LongComparator cmp = FORWARD_COMPARATOR;
         if (query.reverse)
@@ -137,7 +122,7 @@ public class ExhaustiveChecker implements Model
         if (!rows.isEmpty() && operations.isEmpty())
         {
             throw new ValidationException(String.format("Returned rows are not empty, but there were no records in the event log.\nRows: %s\nMax seen LTS: %s\nQuery: %s",
-                                                        rows, inFlightLtsBound, query));
+                                                        rows, maxSeenLts, query));
         }
 
         PeekingIterator<ResultSetRow> rowIter = Iterators.peekingIterator(rows.iterator());
@@ -156,7 +141,7 @@ public class ExhaustiveChecker implements Model
                 if (rowIter.hasNext() && rowIter.peek().cd == cd)
                 {
                     ResultSetRow row = rowIter.next();
-                    RowValidationState validationState = new RowValidationState(row, visibleLtsBound, inFlightLtsBound, partitionState.rangeTombstones);
+                    RowValidationState validationState = new RowValidationState(row, visibleLtsBound, maxSeenLts, partitionState.rangeTombstones);
 
                     // TODO: We only need to go for as long as we explain every column. In fact, we make state _less_ precise by allowing
                     // to continue moving back in time. So far this hasn't proven to be a source of any issues, but we should fix that.
@@ -230,14 +215,11 @@ public class ExhaustiveChecker implements Model
         catch (Throwable t)
         {
             throw new ValidationException(String.format("Caught exception while validating the resultset %s." +
-                                                        "\nchecker.tracker().forceLts(%dL, %dL)" +
-                                                        "\nrun.validator.validatePartition(%dL)" +
                                                         "\nRow Iter Peek: %s" +
                                                         "\nValidated no rows:\n%s" +
                                                         "\nValidated rows:\n%s" +
                                                         "\nRows:\n%s",
                                                         query,
-                                                        inFlightLtsBound, visibleLtsBound, validationLts,
                                                         rowIter.hasNext() ? rowIter.peek() : null,
                                                         validatedNoRows,
                                                         validatedRows.stream().map(Object::toString).collect(Collectors.joining(",\n")),
@@ -320,14 +302,10 @@ public class ExhaustiveChecker implements Model
         }
     }
 
-    public PartitionState inflatePartitionState(long validationLts, long maxLts, Query query)
+    public PartitionState inflatePartitionState(long maxLts, Query query)
     {
-        long currentLts = pdSelector.maxLts(validationLts);
-        long pd = pdSelector.pd(currentLts, schema);
+        long currentLts = pdSelector.maxLtsFor(query.pd);
 
-        if (pd != pdSelector.pd(validationLts, schema))
-            throw new ValidationException("Partition descriptor %d doesn't match partition descriptor %d for LTS %d",
-                                          pd, pdSelector.pd(validationLts, schema), validationLts);
         NavigableMap<Long, List<Operation>> operations = new TreeMap<>();
         List<Ranges.Range> ranges = new ArrayList<>();
 
@@ -338,7 +316,11 @@ public class ExhaustiveChecker implements Model
                 OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
                 if (opType == OpSelectors.OperationKind.DELETE_RANGE)
                 {
-                    ranges.add(maybeWrap(lts, opId, querySelector.inflate(lts, opId).toRange(lts)));
+                    ranges.add(maybeWrap(lts, opId, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toRange(lts)));
+                }
+                else if (opType == OpSelectors.OperationKind.DELETE_SLICE)
+                {
+                    ranges.add(maybeWrap(lts, opId, rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toRange(lts)));
                 }
                 else if (query.match(cd)) // skip descriptors that are out of range
                 {
@@ -389,11 +371,6 @@ public class ExhaustiveChecker implements Model
         }
     }
 
-    public Configuration.ExhaustiveCheckerConfig toConfig()
-    {
-        return new Configuration.ExhaustiveCheckerConfig(tracker.maxSeenLts(), tracker.maxCompleteLts());
-    }
-
     public String toString()
     {
         return "ExhaustiveChecker{" + tracker.toString() + '}';
@@ -557,12 +534,6 @@ public class ExhaustiveChecker implements Model
         }
     }
 
-    @VisibleForTesting
-    public void forceLts(long maxSeen, long maxComplete)
-    {
-        tracker.forceLts(maxSeen, maxComplete);
-    }
-
     public static class Operation implements Comparable<Operation>
     {
         public final long pd;
diff --git a/harry-core/src/harry/model/Model.java b/harry-core/src/harry/model/Model.java
index 04fe6f1..3168c8a 100644
--- a/harry-core/src/harry/model/Model.java
+++ b/harry-core/src/harry/model/Model.java
@@ -18,39 +18,22 @@
 
 package harry.model;
 
-import harry.core.Configuration;
-import harry.ddl.SchemaSpec;
-import harry.model.sut.SystemUnderTest;
+import harry.core.Run;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
 
 public interface Model
 {
     long NO_TIMESTAMP = Long.MIN_VALUE;
 
-    void recordEvent(long lts, boolean quorumAchieved);
-
-    void validatePartitionState(long verificationLts, Query query);
-
-    Configuration.ModelConfiguration toConfig();
+    void validate(Query query);
 
     interface ModelFactory
     {
-        Model create(SchemaSpec schema,
-                     OpSelectors.PdSelector pdSelector,
-                     OpSelectors.DescriptorSelector descriptorSelector,
-                     OpSelectors.MonotonicClock clock,
-                     QuerySelector querySelector,
-                     SystemUnderTest sut);
+        Model make(Run run);
     }
 
     class ValidationException extends RuntimeException
     {
-        public ValidationException()
-        {
-            super();
-        }
-
         public ValidationException(String message)
         {
             super(message);
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index ad8e046..7d7ad6d 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -117,8 +117,7 @@ public interface OpSelectors
 
         public abstract long prevLts(long lts);
 
-        public abstract long maxLts(long lts);
-
+        public abstract long maxLtsFor(long pd);
         public abstract long minLtsAt(long position);
 
         public abstract long minLtsFor(long pd);
@@ -281,8 +280,8 @@ public interface OpSelectors
 
         public long minLtsFor(long pd)
         {
-            long sequenceNumber = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
-            return minLtsAt(sequenceNumber);
+            long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+            return minLtsAt(position);
         }
 
         public long positionFor(long lts)
@@ -291,6 +290,11 @@ public interface OpSelectors
             return windowStart + lts % windowSize;
         }
 
+        public long positionForPd(long pd)
+        {
+            return rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+        }
+
         public long nextLts(long lts)
         {
             long slideCount = lts / switchAfter;
@@ -326,11 +330,9 @@ public interface OpSelectors
             return slideCount * switchAfter - windowSize + positionInCycle;
         }
 
-        public long maxLts(long lts)
+        public long maxLtsFor(long pd)
         {
-            long windowStart = lts / switchAfter;
-            long position = windowStart + lts % windowSize;
-
+            long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID);
             return position * switchAfter + (slideAfterRepeats - 1) * windowSize;
         }
 
@@ -607,8 +609,7 @@ public interface OpSelectors
 
         public long vd(long pd, long cd, long lts, long opId, int col)
         {
-            // change randomNumber / sequenceNumber to prev/Next
-            return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
+            return rng.randomNumber(opId, pd ^ cd ^ lts ^ col);
         }
 
         public long modificationId(long pd, long cd, long lts, long vd, int col)
@@ -622,6 +623,7 @@ public interface OpSelectors
         WRITE,
         DELETE_ROW,
         DELETE_COLUMN,
-        DELETE_RANGE
+        DELETE_RANGE,
+        DELETE_SLICE
     }
 }
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index f7b38a3..ffd50b3 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -22,54 +22,49 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
+import java.util.function.Supplier;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.reconciler.Reconciler;
+import harry.runner.DataTracker;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 public class QuiescentChecker implements Model
 {
-    private final OpSelectors.MonotonicClock clock;
-
-    private final DataTracker tracker;
-    private final SystemUnderTest sut;
-    private final Reconciler reconciler;
+    protected final OpSelectors.MonotonicClock clock;
 
-    public QuiescentChecker(SchemaSpec schema,
-                            OpSelectors.PdSelector pdSelector,
-                            OpSelectors.DescriptorSelector descriptorSelector,
-                            OpSelectors.MonotonicClock clock,
-                            QuerySelector querySelector,
+    protected final DataTracker tracker;
+    protected final SystemUnderTest sut;
+    protected final Reconciler reconciler;
 
-                            SystemUnderTest sut)
+    public QuiescentChecker(Run run)
     {
-        this.clock = clock;
-        this.sut = sut;
+        this.clock = run.clock;
+        this.sut = run.sut;
 
-        this.reconciler = new Reconciler(schema, pdSelector, descriptorSelector, querySelector);
-        this.tracker = new DataTracker();
+        this.reconciler = new Reconciler(run.schemaSpec, run.pdSelector, run.descriptorSelector, run.rangeSelector);
+        this.tracker = run.tracker;
     }
 
-    public void recordEvent(long lts, boolean quorumAchieved)
+    public void validate(Query query)
     {
-        tracker.recordEvent(lts, quorumAchieved);
+        validate(() -> SelectHelper.execute(sut, clock, query), query);
     }
 
-    public void validatePartitionState(long verificationLts, Query query)
+    protected void validate(Supplier<List<ResultSetRow>> rowsSupplier, Query query)
     {
-        long maxCompeteLts = tracker.maxCompleteLts();
-        long maxSeenLts = tracker.maxSeenLts();
+        long maxCompeteLts = tracker.maxConsecutiveFinished();
+        long maxSeenLts = tracker.maxStarted();
 
         assert maxCompeteLts == maxSeenLts : "Runner hasn't settled down yet. " +
                                              "Quiescent model can't be reliably used in such cases.";
 
-        List<ResultSetRow> actualRows = SelectHelper.execute(sut, clock, query);
+        List<ResultSetRow> actualRows = rowsSupplier.get();
         Iterator<ResultSetRow> actual = actualRows.iterator();
         Collection<Reconciler.RowState> expectedRows = reconciler.inflatePartitionState(query.pd, maxSeenLts, query).rows(query.reverse);
         Iterator<Reconciler.RowState> expected = expectedRows.iterator();
@@ -104,15 +99,4 @@ public class QuiescentChecker implements Model
                                           actualRows);
         }
     }
-
-    @VisibleForTesting
-    public void forceLts(long maxSeen, long maxComplete)
-    {
-        tracker.forceLts(maxSeen, maxComplete);
-    }
-
-    public Configuration.ModelConfiguration toConfig()
-    {
-        return new Configuration.QuiescentCheckerConfig(tracker.maxSeenLts(), tracker.maxCompleteLts());
-    }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/model/SelectHelper.java b/harry-core/src/harry/model/SelectHelper.java
index 8cf9d80..2b50339 100644
--- a/harry-core/src/harry/model/SelectHelper.java
+++ b/harry-core/src/harry/model/SelectHelper.java
@@ -117,7 +117,7 @@ public class SelectHelper
     public static List<ResultSetRow> execute(SystemUnderTest sut, OpSelectors.MonotonicClock clock, Query query)
     {
         CompiledStatement compiled = query.toSelectStatement();
-        Object[][] objects = sut.execute(compiled.cql(), compiled.bindings());
+        Object[][] objects = sut.execute(compiled.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, compiled.bindings());
         List<ResultSetRow> result = new ArrayList<>();
         for (Object[] obj : objects)
             result.add(resultSetToRow(query.schemaSpec, clock, obj));
diff --git a/harry-core/src/harry/model/StatelessVisibleRowsChecker.java b/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
index 01f0381..8573b3b 100644
--- a/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
+++ b/harry-core/src/harry/model/StatelessVisibleRowsChecker.java
@@ -22,11 +22,12 @@ import java.util.List;
 import java.util.function.Supplier;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 import static harry.model.VisibleRowsChecker.descendingIterator;
 
@@ -43,29 +44,18 @@ public class StatelessVisibleRowsChecker implements Model
 
     protected final SchemaSpec schema;
 
-    public StatelessVisibleRowsChecker(SchemaSpec schema,
-                                       OpSelectors.PdSelector pdSelector,
-                                       OpSelectors.DescriptorSelector descriptorSelector,
-                                       OpSelectors.MonotonicClock clock,
-                                       QuerySelector querySelector,
-                                       SystemUnderTest sut)
+    public StatelessVisibleRowsChecker(Run run)
     {
-        this.pdSelector = pdSelector;
-        this.descriptorSelector = descriptorSelector;
-        this.schema = schema;
-        this.clock = clock;
-        this.sut = sut;
+        this.pdSelector = run.pdSelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.sut = run.sut;
     }
 
-    public void recordEvent(long lts, boolean quorumAchieved)
+    public void validate(Query query)
     {
-        //no-op
-    }
-
-    public void validatePartitionState(long validationLts, Query query)
-    {
-        validatePartitionState(validationLts,
-                               query,
+        validatePartitionState(query,
                                () -> SelectHelper.execute(sut, clock, query));
     }
 
@@ -74,17 +64,17 @@ public class StatelessVisibleRowsChecker implements Model
         throw new RuntimeException("not implemented");
     }
 
-    void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+    void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
     {
         // we ignore Query here, since our criteria for checking in this model is presence of the row in the resultset
-        long pd = pdSelector.pd(validationLts, schema);
+        long pd = query.pd;
 
         List<ResultSetRow> rows = rowsSupplier.get();
 
         for (ResultSetRow row : rows)
         {
             VisibleRowsChecker.LongIterator rowLtsIter = descendingIterator(row.lts);
-            VisibleRowsChecker.LongIterator modelLtsIter = descendingIterator(pdSelector, validationLts);
+            VisibleRowsChecker.LongIterator modelLtsIter = descendingIterator(pdSelector, pd);
 
             outer:
             while (rowLtsIter.hasNext())
diff --git a/harry-core/src/harry/model/VisibleRowsChecker.java b/harry-core/src/harry/model/VisibleRowsChecker.java
index ddda8b8..902bf07 100644
--- a/harry-core/src/harry/model/VisibleRowsChecker.java
+++ b/harry-core/src/harry/model/VisibleRowsChecker.java
@@ -24,18 +24,18 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
 
 import harry.core.Configuration;
+import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
+import harry.runner.DefaultDataTracker;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
 
 /**
  * A simple model to check whether or not the rows reported as visible by the database are reflected in
@@ -43,54 +43,86 @@ import harry.runner.QuerySelector;
  */
 public class VisibleRowsChecker implements Model
 {
-    protected final Map<Long, TreeMap<Long, Event>> eventLog;
     protected final OpSelectors.DescriptorSelector descriptorSelector;
     protected final OpSelectors.PdSelector pdSelector;
     protected final OpSelectors.MonotonicClock clock;
+    protected final LoggingDataTracker tracker;
     protected final SystemUnderTest sut;
-    protected final AtomicLong maxLts;
     protected final SchemaSpec schema;
 
-    public VisibleRowsChecker(SchemaSpec schema,
-                              OpSelectors.PdSelector pdSelector,
-                              OpSelectors.DescriptorSelector descriptorSelector,
-                              OpSelectors.MonotonicClock clock,
-                              QuerySelector querySelector,
-                              SystemUnderTest sut)
+    public VisibleRowsChecker(Run run)
     {
-        this.pdSelector = pdSelector;
-        this.descriptorSelector = descriptorSelector;
-        this.eventLog = new HashMap<>();
-        this.maxLts = new AtomicLong();
-        this.schema = schema;
-        this.clock = clock;
-        this.sut = sut;
+        assert run.tracker instanceof LoggingDataTracker : "Visible rows checker requires a logging data tracker to run";
+        this.tracker = (LoggingDataTracker) run.tracker;
+        this.tracker.pdSelector = run.pdSelector;
+        this.pdSelector = run.pdSelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.sut = run.sut;
     }
 
-
-    public synchronized void recordEvent(long lts, boolean quorumAchieved)
+    public static class LoggingDataTracker extends DefaultDataTracker
     {
-        maxLts.updateAndGet((old) -> Math.max(old, lts));
-        long pd = pdSelector.pd(lts);
+        protected final Map<Long, TreeMap<Long, Event>> eventLog = new HashMap<>();
+        private OpSelectors.PdSelector pdSelector;
 
-        // TODO: This is definitely not optimal, but we probably use a better, potentially off-heap sorted structure for that anyways
-        TreeMap<Long, Event> events = eventLog.get(pd);
-        if (events == null)
+        public LoggingDataTracker()
         {
-            events = new TreeMap<>();
-            eventLog.put(pd, events);
         }
 
-        Event event = events.get(lts);
-        assert event == null || !event.quorumAchieved : "Operation should be partially visible before it is fully visible";
-        events.put(lts, new Event(lts, quorumAchieved));
-    }
+        public synchronized void started(long lts)
+        {
+            super.started(lts);
+            recordEvent(lts, false);
+        }
+
+        public synchronized void finished(long lts)
+        {
+            super.finished(lts);
+            recordEvent(lts, true);
+        }
+
+        public synchronized TreeMap<Long, Event> events(long pd)
+        {
+            TreeMap<Long, Event> log = eventLog.get(pd);
+            if (log == null)
+                return null;
+
+            return (TreeMap<Long, Event>) log.clone();
+        }
+
+        public long maxStarted()
+        {
+            return super.maxStarted();
+        }
 
+        public long maxConsecutiveFinished()
+        {
+            return super.maxConsecutiveFinished();
+        }
 
-    public void validatePartitionState(long validationLts, Query query)
+        public void recordEvent(long lts, boolean finished)
+        {
+            long pd = pdSelector.pd(lts);
+
+            // TODO: This is definitely not optimal, but we probably use a better, potentially off-heap sorted structure for that anyways
+            TreeMap<Long, Event> events = eventLog.get(pd);
+            if (events == null)
+            {
+                events = new TreeMap<>();
+                eventLog.put(pd, events);
+            }
+
+            Event event = events.get(lts);
+            assert event == null || !event.quorumAchieved : "Operation should be partially visible before it is fully visible";
+            events.put(lts, new Event(lts, finished));
+        }
+    }
+
+    public void validate(Query query)
     {
-        validatePartitionState(validationLts,
-                               query,
+        validatePartitionState(query,
                                () -> SelectHelper.execute(sut, clock, query));
     }
 
@@ -99,22 +131,23 @@ public class VisibleRowsChecker implements Model
         throw new RuntimeException("not implemented");
     }
 
-    synchronized void validatePartitionState(long validationLts, Query query, Supplier<List<ResultSetRow>> rowsSupplier)
+    synchronized void validatePartitionState(Query query, Supplier<List<ResultSetRow>> rowsSupplier)
     {
         // TODO: Right now, we ignore Query here!
-        long pd = pdSelector.pd(validationLts, schema);
+        long pd = query.pd;
         List<ResultSetRow> rows = rowsSupplier.get();
-        TreeMap<Long, Event> events = eventLog.get(pd);
+        TreeMap<Long, Event> events = tracker.events(pd);
+
         if (!rows.isEmpty() && (events == null || events.isEmpty()))
         {
             throw new ValidationException(String.format("Returned rows are not empty, but there were no records in the event log.\nRows: %s\nSeen pds: %s",
-                                                        rows, eventLog.keySet()));
+                                                        rows, tracker.eventLog.keySet()));
         }
 
         for (ResultSetRow row : rows)
         {
             LongIterator rowLtsIter = descendingIterator(row.lts);
-            PeekingIterator<Event> modelLtsIter = Iterators.peekingIterator(events.subMap(0L, true, maxLts.get(), true)
+            PeekingIterator<Event> modelLtsIter = Iterators.peekingIterator(events.subMap(0L, true, tracker.maxStarted(), true)
                                                                                   .descendingMap()
                                                                                   .values()
                                                                                   .iterator());
@@ -158,11 +191,11 @@ public class VisibleRowsChecker implements Model
     }
 
 
-    public static LongIterator descendingIterator(OpSelectors.PdSelector pdSelector, long verificationLts)
+    public static LongIterator descendingIterator(OpSelectors.PdSelector pdSelector, long pd)
     {
         return new VisibleRowsChecker.LongIterator()
         {
-            long next = pdSelector.maxLts(verificationLts);
+            long next = pdSelector.maxLtsFor(pd);
 
             public long nextLong()
             {
diff --git a/harry-core/src/harry/model/sut/PrintlnSut.java b/harry-core/src/harry/model/sut/PrintlnSut.java
index 463d780..ad14b0b 100644
--- a/harry-core/src/harry/model/sut/PrintlnSut.java
+++ b/harry-core/src/harry/model/sut/PrintlnSut.java
@@ -32,17 +32,18 @@ public class PrintlnSut implements SystemUnderTest
     {
     }
 
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
     {
-        System.out.println(String.format("%s | %s",
+        System.out.println(String.format("%s | %s | %s",
                                          statement,
+                                         cl,
                                          Arrays.toString(bindings)));
         return new Object[0][];
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
-        return CompletableFuture.supplyAsync(() -> execute(statement, bindings),
+        return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings),
                                              Runnable::run);
     }
 }
diff --git a/harry-core/src/harry/model/sut/SystemUnderTest.java b/harry-core/src/harry/model/sut/SystemUnderTest.java
index 9e5e0a9..ec0a48e 100644
--- a/harry-core/src/harry/model/sut/SystemUnderTest.java
+++ b/harry-core/src/harry/model/sut/SystemUnderTest.java
@@ -35,20 +35,51 @@ public interface SystemUnderTest
 
     default void schemaChange(String statement)
     {
-        execute(statement, new Object[]{});
+        execute(statement, ConsistencyLevel.ALL, new Object[]{});
     }
 
-    default Object[][] execute(CompiledStatement statement)
+    default Object[][] execute(CompiledStatement statement, ConsistencyLevel cl)
     {
-        return execute(statement.cql(), statement.bindings());
+        return execute(statement.cql(), cl, statement.bindings());
     }
 
-    Object[][] execute(String statement, Object... bindings);
+    Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings);
 
-    CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings);
+    CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings);
 
     interface SystemUnderTestFactory
     {
         SystemUnderTest create();
     }
+
+    enum ConsistencyLevel {
+        ALL, QUORUM, NODE_LOCAL
+    }
+
+    public static final SystemUnderTest NO_OP = new NoOpSut();
+
+    public class NoOpSut implements SystemUnderTest
+    {
+        private NoOpSut() {}
+        public boolean isShutdown()
+        {
+            return false;
+        }
+
+        public void shutdown()
+        {
+        }
+
+        public Object[][] execute(String statement, ConsistencyLevel cl,  Object... bindings)
+        {
+            return new Object[0][];
+        }
+
+        public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
+        {
+            return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings),
+                                                 Runnable::run);
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index 50b03c6..cab8fb8 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -24,14 +24,16 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 
 import harry.ddl.SchemaSpec;
+import harry.model.ExhaustiveChecker;
 import harry.model.OpSelectors;
 import harry.runner.AbstractPartitionVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
 
@@ -49,36 +51,42 @@ public class Reconciler
 {
     private final OpSelectors.DescriptorSelector descriptorSelector;
     private final OpSelectors.PdSelector pdSelector;
-    private final QuerySelector querySelector;
+    private final QueryGenerator rangeSelector;
     private final SchemaSpec schema;
 
     public Reconciler(SchemaSpec schema,
                       OpSelectors.PdSelector pdSelector,
                       OpSelectors.DescriptorSelector descriptorSelector,
-                      QuerySelector querySelector)
+                      QueryGenerator rangeSelector)
     {
         this.descriptorSelector = descriptorSelector;
         this.pdSelector = pdSelector;
         this.schema = schema;
-        this.querySelector = querySelector;
+        this.rangeSelector = rangeSelector;
     }
 
     public PartitionState inflatePartitionState(final long pd, long maxLts, Query query)
     {
         List<Ranges.Range> ranges = new ArrayList<>();
+
         // TODO: we should think of a single-pass algorithm that would allow us to inflate all deletes and range deletes for a partition
         PartitionVisitor partitionVisitor = new AbstractPartitionVisitor(pdSelector, descriptorSelector, schema)
         {
             public void operation(long lts, long pd, long cd, long m, long opId)
             {
-                if (!query.match(cd))
-                    return;
-
                 OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
                 if (opType == OpSelectors.OperationKind.DELETE_RANGE)
-                    ranges.add(querySelector.inflate(lts, opId).toRange(lts));
+                {
+                    ranges.add(rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toRange(lts));
+                }
+                else if (opType == OpSelectors.OperationKind.DELETE_SLICE)
+                {
+                    ranges.add(rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toRange(lts));
+                }
                 else if (opType == OpSelectors.OperationKind.DELETE_ROW)
+                {
                     ranges.add(new Ranges.Range(cd, cd, true, true, lts));
+                }
             }
         };
 
@@ -90,21 +98,27 @@ public class Reconciler
             currentLts = pdSelector.nextLts(currentLts);
         }
 
-        // We have to do two passes to avoid inflating deleted items
-        Ranges rts = new Ranges(ranges);
-
         PartitionState partitionState = new PartitionState();
         partitionVisitor = new AbstractPartitionVisitor(pdSelector, descriptorSelector, schema)
         {
             public void operation(long lts, long pd, long cd, long m, long opId)
             {
+                if (!query.match(cd))
+                    return;
+
                 OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
 
-                if (opType == OpSelectors.OperationKind.DELETE_ROW || opType == OpSelectors.OperationKind.DELETE_RANGE)
+                if (opType == OpSelectors.OperationKind.DELETE_ROW
+                    || opType == OpSelectors.OperationKind.DELETE_RANGE
+                    || opType == OpSelectors.OperationKind.DELETE_SLICE)
                     return;
 
-                if (!query.match(cd) || rts.isShadowed(cd, lts))
-                    return;
+                // TODO: avoid linear scan
+                for (Ranges.Range range : ranges)
+                {
+                    if (range.timestamp >= lts && range.contains(cd))
+                        return;
+                }
 
                 if (opType == OpSelectors.OperationKind.WRITE)
                 {
@@ -167,7 +181,6 @@ public class Reconciler
                     }
                 }
 
-
                 state = new RowState(cd, vdsCopy, ltss);
                 rows.put(cd, state);
             }
diff --git a/harry-core/src/harry/runner/AbstractPartitionVisitor.java b/harry-core/src/harry/runner/AbstractPartitionVisitor.java
index aefa79f..a3ebd33 100644
--- a/harry-core/src/harry/runner/AbstractPartitionVisitor.java
+++ b/harry-core/src/harry/runner/AbstractPartitionVisitor.java
@@ -91,13 +91,14 @@ public abstract class AbstractPartitionVisitor implements PartitionVisitor
 
     protected void operation(long lts, long pd, long cd, long m, long opId)
     {
+
     }
 
     protected void afterBatch(long lts, long pd, long m)
     {
     }
 
-    public void shutdown()
+    public void shutdown() throws InterruptedException
     {
     }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/AllPartitionsValidator.java b/harry-core/src/harry/runner/AllPartitionsValidator.java
new file mode 100644
index 0000000..674ab5e
--- /dev/null
+++ b/harry-core/src/harry/runner/AllPartitionsValidator.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.MetricReporter;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.model.Model;
+import harry.model.OpSelectors;
+
+// This might be something that potentially grows into the validator described in the design doc;
+// right now it's just a helper/container class
+public class AllPartitionsValidator implements PartitionVisitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(AllPartitionsValidator.class);
+
+    protected final Model model;
+    protected final SchemaSpec schema;
+
+    protected final OpSelectors.MonotonicClock clock;
+    protected final OpSelectors.PdSelector pdSelector;
+    protected final MetricReporter metricReporter;
+    protected final ExecutorService executor;
+    protected final int concurrency;
+    protected final int triggerAfter;
+
+    public AllPartitionsValidator(int concurrency,
+                                  int triggerAfter,
+                                  Run run,
+                                  Model.ModelFactory modelFactory)
+    {
+        this.triggerAfter = triggerAfter;
+        this.metricReporter = run.metricReporter;
+        this.model = modelFactory.make(run);
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.pdSelector = run.pdSelector;
+        this.concurrency = concurrency;
+        this.executor = Executors.newFixedThreadPool(concurrency);
+    }
+
+    protected CompletableFuture<Void> validateAllPartitions(ExecutorService executor, int parallelism)
+    {
+        long maxLts = clock.maxLts() - 1;
+        long maxPos = pdSelector.positionFor(maxLts);
+        AtomicLong counter = new AtomicLong();
+        CompletableFuture[] futures = new CompletableFuture[parallelism];
+        for (int i = 0; i < parallelism; i++)
+        {
+            futures[i] = CompletableFuture.supplyAsync(() -> {
+                long pos;
+                while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted())
+                {
+                    if (pos > 0 && pos % 1000 == 0)
+                        logger.debug(String.format("Validated %d out of %d partitions", pos, maxPos));
+                    long visitLts = pdSelector.minLtsAt(pos);
+
+                    metricReporter.validatePartition();
+                    for (boolean reverse : new boolean[]{ true, false })
+                    {
+                        model.validate(Query.selectPartition(schema, pdSelector.pd(visitLts, schema), reverse));
+                    }
+                }
+                return null;
+            }, executor);
+        }
+        return CompletableFuture.allOf(futures);
+    }
+
+    public void visitPartition(long lts)
+    {
+        if (lts % triggerAfter == 0)
+            validateAllPartitions(executor, concurrency);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+}
diff --git a/harry-integration/src/harry/runner/HarryRunnerJvm.java b/harry-core/src/harry/runner/DataTracker.java
similarity index 55%
rename from harry-integration/src/harry/runner/HarryRunnerJvm.java
rename to harry-core/src/harry/runner/DataTracker.java
index 38bf264..d5825fb 100644
--- a/harry-integration/src/harry/runner/HarryRunnerJvm.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -19,21 +19,34 @@
 package harry.runner;
 
 import harry.core.Configuration;
-import harry.model.sut.InJvmSut;
 
-import java.io.File;
+public interface DataTracker
+{
+    void started(long lts);
+    void finished(long lts);
 
-public class HarryRunnerJvm extends org.apache.cassandra.distributed.test.TestBaseImpl implements HarryRunner {
+    long maxStarted();
+    long maxConsecutiveFinished();
 
-    public static void main(String[] args) throws Throwable {
-        InJvmSut.registerSubtypes();
+    public Configuration.DataTrackerConfiguration toConfig();
 
-        HarryRunnerJvm runner = new HarryRunnerJvm();
-        File configFile = runner.loadConfig(args);
-
-        Configuration configuration = Configuration.fromFile(configFile);
-        runner.run(configuration);
+    interface DataTrackerFactory {
+        DataTracker make();
     }
 
+    public static DataTracker NO_OP = new NoOpDataTracker();
+    class NoOpDataTracker implements DataTracker
+    {
+        private NoOpDataTracker() {}
+
+        public void started(long lts) {}
+        public void finished(long lts) {}
+        public long maxStarted() { return 0; }
+        public long maxConsecutiveFinished() { return 0; }
 
+        public Configuration.DataTrackerConfiguration toConfig()
+        {
+            return null;
+        }
+    }
 }
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
new file mode 100644
index 0000000..1b55482
--- /dev/null
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -0,0 +1,140 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Configuration;
+
+public class DefaultDataTracker implements DataTracker
+{
+    private static final Logger logger = LoggerFactory.getLogger(DefaultDataTracker.class);
+
+    private final AtomicLong maxSeenLts;
+    // TODO: This is a trivial implementation that can be significantly improved upon
+    // for example, we could use a bitmap that records `1`s for all lts that are after
+    // the consective, and "collapse" the bitmap state into the long as soon as we see
+    // consecutive `1` on the left side.
+    private final AtomicLong maxCompleteLts;
+    private final PriorityBlockingQueue<Long> reorderBuffer;
+
+    public DefaultDataTracker()
+    {
+        this.maxSeenLts = new AtomicLong(-1);
+        this.maxCompleteLts = new AtomicLong(-1);
+        this.reorderBuffer = new PriorityBlockingQueue<>(100);
+    }
+
+    // TODO: there's also some room for improvement in terms of concurrency
+    // TODO: remove pd?
+    public void started(long lts)
+    {
+        recordEvent(lts, false);
+    }
+
+    public void finished(long lts)
+    {
+        recordEvent(lts, true);
+    }
+
+    private void recordEvent(long lts, boolean finished)
+    {
+        // all seen LTS are allowed to be "in-flight"
+        maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
+
+        if (!finished)
+            return;
+
+        long maxAchievedConsecutive = drainReorderQueue();
+
+        if (maxAchievedConsecutive + 1 == lts)
+            maxCompleteLts.compareAndSet(maxAchievedConsecutive, lts);
+        else
+            reorderBuffer.offer(lts);
+    }
+
+    public long drainReorderQueue()
+    {
+        final long expected = maxCompleteLts.get();
+        long maxAchievedConsecutive = expected;
+        if (reorderBuffer.isEmpty())
+            return maxAchievedConsecutive;
+
+        boolean catchingUp = false;
+
+        Long smallest = reorderBuffer.poll();
+        while (smallest != null && smallest == maxAchievedConsecutive + 1)
+        {
+            maxAchievedConsecutive++;
+            catchingUp = true;
+            smallest = reorderBuffer.poll();
+        }
+
+        // put back
+        if (smallest != null)
+            reorderBuffer.offer(smallest);
+
+        if (catchingUp)
+            maxCompleteLts.compareAndSet(expected, maxAchievedConsecutive);
+
+        int bufferSize = reorderBuffer.size();
+        if (bufferSize > 100)
+            logger.warn("Reorder buffer size has grown up to " + reorderBuffer.size());
+        return maxAchievedConsecutive;
+    }
+
+    public long maxStarted()
+    {
+        return maxSeenLts.get();
+    }
+
+    public long maxConsecutiveFinished()
+    {
+        return maxCompleteLts.get();
+    }
+
+    public Configuration.DataTrackerConfiguration toConfig()
+    {
+        return new Configuration.DefaultDataTrackerConfiguration(maxSeenLts.get(), maxCompleteLts.get());
+    }
+
+    @VisibleForTesting
+    public void forceLts(long maxSeen, long maxComplete)
+    {
+        this.maxSeenLts.set(maxSeen);
+        this.maxCompleteLts.set(maxComplete);
+    }
+
+    public String toString()
+    {
+        List<Long> buf = new ArrayList<>(reorderBuffer);
+        return "DataTracker{" +
+               "maxSeenLts=" + maxSeenLts +
+               ", maxCompleteLts=" + maxCompleteLts +
+               ", reorderBuffer=" + buf +
+               '}';
+    }
+}
diff --git a/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java b/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java
deleted file mode 100644
index 89873fd..0000000
--- a/harry-core/src/harry/runner/DefaultPartitionVisitorFactory.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.runner;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.ddl.SchemaSpec;
-import harry.model.Model;
-import harry.model.OpSelectors;
-import harry.model.sut.SystemUnderTest;
-import harry.operations.CompiledStatement;
-
-public class DefaultPartitionVisitorFactory implements Supplier<PartitionVisitor>
-{
-    private static final Logger logger = LoggerFactory.getLogger(DefaultPartitionVisitorFactory.class);
-
-    private final Model model;
-    private final SystemUnderTest sut;
-    private final RowVisitor rowVisitor;
-    private final BufferedWriter operationLog;
-
-    // TODO: shutdown properly
-    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
-    private final Supplier<DefaultPartitionVisitor> factory;
-
-    public DefaultPartitionVisitorFactory(Model model,
-                                          SystemUnderTest sut,
-                                          OpSelectors.PdSelector pdSelector,
-                                          OpSelectors.DescriptorSelector descriptorSelector,
-                                          SchemaSpec schema,
-                                          RowVisitor rowVisitor)
-    {
-        this.model = model;
-        this.sut = sut;
-        this.rowVisitor = rowVisitor;
-        this.factory = () -> new DefaultPartitionVisitor(pdSelector, descriptorSelector, schema);
-
-        File f = new File("operation.log");
-        try
-        {
-            operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public PartitionVisitor get()
-    {
-        return factory.get();
-    }
-
-    private class DefaultPartitionVisitor extends AbstractPartitionVisitor
-    {
-        private List<String> statements = new ArrayList<>();
-        private List<Object> bindings = new ArrayList<>();
-        private List<CompletableFuture> futures = new ArrayList<>();
-
-        public DefaultPartitionVisitor(OpSelectors.PdSelector pdSelector,
-                                       OpSelectors.DescriptorSelector descriptorSelector,
-                                       SchemaSpec schema)
-        {
-            super(pdSelector, descriptorSelector, schema);
-        }
-
-        public void beforeLts(long lts, long pd)
-        {
-            model.recordEvent(lts, false);
-        }
-
-        public void afterLts(long lts, long pd)
-        {
-            for (CompletableFuture future : futures)
-            {
-                try
-                {
-                    // TODO: currently, Cassandra keeps timing out; we definitely need to investigate that, but we need to focus on other things first
-                    future.get();
-                }
-                catch (Throwable t)
-                {
-                    throw new Model.ValidationException("Couldn't repeat operations within timeout bounds.", t);
-                }
-            }
-
-            log("LTS: %d. Pd %d. Finished\n", lts, pd);
-            model.recordEvent(lts, true);
-        }
-
-        public void beforeBatch(long lts, long pd, long m)
-        {
-            statements = new ArrayList<>();
-            bindings = new ArrayList<>();
-        }
-
-        public void operation(long lts, long pd, long cd, long m, long opId)
-        {
-            OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
-            CompiledStatement statement = rowVisitor.visitRow(op, lts, pd, cd, opId);
-
-            log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
-                              lts, pd, cd, m, opId, statement));
-
-            statements.add(statement.cql());
-            bindings.addAll(Arrays.asList(statement.bindings()));
-        }
-
-        public void afterBatch(long lts, long pd, long m)
-        {
-            String query = String.join(" ", statements);
-
-            if (statements.size() > 1)
-                query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
-
-            Object[] bindingsArray = new Object[bindings.size()];
-            bindings.toArray(bindingsArray);
-
-            CompletableFuture<Object[][]> future = new CompletableFuture<>();
-            executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
-            futures.add(future);
-
-            statements = null;
-            bindings = null;
-        }
-    }
-
-    void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
-    {
-        if (sut.isShutdown())
-            throw new IllegalStateException("System under test is shut down");
-
-        sut.executeAsync(statement.cql(), statement.bindings())
-           .whenComplete((res, t) -> {
-               if (t != null)
-                   executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
-               else
-                   future.complete(res);
-           });
-    }
-
-    private void log(String format, Object... objects)
-    {
-        try
-        {
-            operationLog.write(String.format(format, objects));
-        }
-        catch (IOException e)
-        {
-            // ignore
-        }
-    }
-
-    public void shutdown()
-    {
-        executor.shutdown();
-        try
-        {
-            executor.awaitTermination(30, TimeUnit.SECONDS);
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/HarryRunner.java b/harry-core/src/harry/runner/HarryRunner.java
index 56e2350..77dcc8b 100644
--- a/harry-core/src/harry/runner/HarryRunner.java
+++ b/harry-core/src/harry/runner/HarryRunner.java
@@ -33,24 +33,40 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-public interface HarryRunner
+public abstract class HarryRunner
 {
+    public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
 
-    Logger logger = LoggerFactory.getLogger(HarryRunner.class);
+    protected CompletableFuture progress;
+    protected ScheduledThreadPoolExecutor executor;
+    public abstract void beforeRun(Runner runner);
+    public void afterRun(Runner runner, Object result)
+    {
+        executor.shutdown();
+        try
+        {
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            // ignore
+        }
+    }
 
-    default void run(Configuration configuration) throws Throwable
+    public void run(Configuration config) throws Throwable
     {
         System.setProperty("cassandra.disable_tcactive_openssl", "true");
         System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
         System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
 
-        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+        executor = new ScheduledThreadPoolExecutor(1);
         executor.setRemoveOnCancelPolicy(true);
 
-        Runner runner = configuration.createRunner();
+        Runner runner = config.createRunner();
         Run run = runner.getRun();
 
-        CompletableFuture progress = runner.initAndStartAll();
+        progress = runner.initAndStartAll();
+        beforeRun(runner);
 
         // Uncomment this if you want to have fun!
         // scheduleCorruption(run, executor);
@@ -63,7 +79,7 @@ public interface HarryRunner
                 if (b != null)
                     return b;
                 return a;
-            }).get(run.snapshot.run_time_unit.toSeconds(run.snapshot.run_time) + 30,
+            }).get(config.run_time_unit.toSeconds(config.run_time) + 30,
                    TimeUnit.SECONDS);
             if (result instanceof Throwable)
                 logger.error("Execution failed", result);
@@ -77,6 +93,8 @@ public interface HarryRunner
         }
         finally
         {
+            afterRun(runner, result);
+
             logger.info("Shutting down executor..");
             tryRun(() -> {
                 executor.shutdownNow();
@@ -99,7 +117,7 @@ public interface HarryRunner
         }
     }
 
-    default void tryRun(ThrowingRunnable runnable)
+    public void tryRun(ThrowingRunnable runnable)
     {
         try
         {
@@ -117,7 +135,7 @@ public interface HarryRunner
      * @return Configuration YAML file.
      * @throws Exception If file is not found or cannot be read.
      */
-    default File loadConfig(String[] args) throws Exception {
+    public File loadConfig(String[] args) throws Exception {
         if (args == null || args.length == 0) {
             throw new Exception("Harry config YAML not provided.");
         }
diff --git a/harry-core/src/harry/runner/LoggingPartitionVisitor.java b/harry-core/src/harry/runner/LoggingPartitionVisitor.java
new file mode 100644
index 0000000..c3a77af
--- /dev/null
+++ b/harry-core/src/harry/runner/LoggingPartitionVisitor.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import harry.core.Run;
+import harry.operations.CompiledStatement;
+
+public class LoggingPartitionVisitor extends MutatingPartitionVisitor
+{
+    private final BufferedWriter operationLog;
+
+    public LoggingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run, rowVisitorFactory);
+
+        File f = new File("operation.log");
+        try
+        {
+            operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
+        }
+        catch (FileNotFoundException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void afterLts(long lts, long pd)
+    {
+        super.afterLts(lts, pd);
+        log("LTS: %d. Pd %d. Finished\n", lts, pd);
+    }
+
+    @Override
+    protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
+    {
+        CompiledStatement statement = super.operationInternal(lts, pd, cd, m, opId);
+
+        log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
+                          lts, pd, cd, m, opId, statement));
+
+        return statement;
+    }
+
+    public static String bindingsToString(Object... bindings)
+    {
+        StringBuilder sb = new StringBuilder();
+        boolean isFirst = true;
+        for (Object binding : bindings)
+        {
+            if (isFirst)
+                isFirst = false;
+            else
+                sb.append(",");
+
+            if (binding instanceof String)
+                sb.append("\"").append(binding).append("\"");
+            else if (binding instanceof Long)
+                sb.append(binding).append("L");
+            else
+                sb.append(binding);
+        }
+        return sb.toString();
+    }
+
+    private void log(String format, Object... objects)
+    {
+        try
+        {
+            operationLog.write(String.format(format, objects));
+            operationLog.flush();
+        }
+        catch (IOException e)
+        {
+            // ignore
+        }
+    }
+}
diff --git a/harry-core/src/harry/runner/MutatingPartitionVisitor.java b/harry-core/src/harry/runner/MutatingPartitionVisitor.java
new file mode 100644
index 0000000..a818091
--- /dev/null
+++ b/harry-core/src/harry/runner/MutatingPartitionVisitor.java
@@ -0,0 +1,133 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.OpSelectors;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class MutatingPartitionVisitor extends AbstractPartitionVisitor
+{
+    private final List<String> statements = new ArrayList<>();
+    private final List<Object> bindings = new ArrayList<>();
+
+    private final List<CompletableFuture<?>> futures = new ArrayList<>();
+
+    protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
+    protected final DataTracker tracker;
+    protected final SystemUnderTest sut;
+    protected final RowVisitor rowVisitor;
+
+    public MutatingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run.pdSelector, run.descriptorSelector, run.schemaSpec);
+        this.tracker = run.tracker;
+        this.sut = run.sut;
+        this.rowVisitor = rowVisitorFactory.make(run);
+    }
+
+    public void beforeLts(long lts, long pd)
+    {
+        tracker.started(lts);
+    }
+
+    public void afterLts(long lts, long pd)
+    {
+        for (CompletableFuture<?> future : futures)
+        {
+            try
+            {
+                future.get();
+            }
+            catch (Throwable t)
+            {
+                throw new Model.ValidationException("Couldn't repeat operations within timeout bounds.", t);
+            }
+        }
+        futures.clear();
+        tracker.finished(lts);
+    }
+
+    public void beforeBatch(long lts, long pd, long m)
+    {
+        statements.clear();
+        bindings.clear();
+    }
+
+    protected void operation(long lts, long pd, long cd, long m, long opId)
+    {
+        CompiledStatement statement = operationInternal(lts, pd, cd, m, opId);
+        statements.add(statement.cql());
+        for (Object binding : statement.bindings())
+            bindings.add(binding);
+    }
+
+    protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
+    {
+        OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
+        return rowVisitor.visitRow(op, lts, pd, cd, opId);
+    }
+
+    public void afterBatch(long lts, long pd, long m)
+    {
+        String query = String.join(" ", statements);
+
+        if (statements.size() > 1)
+            query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
+
+        Object[] bindingsArray = new Object[bindings.size()];
+        bindings.toArray(bindingsArray);
+
+        CompletableFuture<Object[][]> future = new CompletableFuture<>();
+        executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
+        futures.add(future);
+
+        statements.clear();
+        bindings.clear();
+    }
+
+    void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
+    {
+        if (sut.isShutdown())
+            throw new IllegalStateException("System under test is shut down");
+
+        sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
+           .whenComplete((res, t) -> {
+               if (t != null)
+                   executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
+               else
+                   future.complete(res);
+           });
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+    }
+}
diff --git a/harry-core/src/harry/runner/DefaultRowVisitor.java b/harry-core/src/harry/runner/MutatingRowVisitor.java
similarity index 61%
rename from harry-core/src/harry/runner/DefaultRowVisitor.java
rename to harry-core/src/harry/runner/MutatingRowVisitor.java
index 1968192..01b5acd 100644
--- a/harry-core/src/harry/runner/DefaultRowVisitor.java
+++ b/harry-core/src/harry/runner/MutatingRowVisitor.java
@@ -18,6 +18,8 @@
 
 package harry.runner;
 
+import harry.core.MetricReporter;
+import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
@@ -25,32 +27,33 @@ import harry.operations.DeleteHelper;
 import harry.operations.WriteHelper;
 import harry.util.BitSet;
 
-public class DefaultRowVisitor implements RowVisitor
+public class MutatingRowVisitor implements RowVisitor
 {
-    private final SchemaSpec schema;
-    private final OpSelectors.MonotonicClock clock;
-    private final OpSelectors.DescriptorSelector descriptorSelector;
-    private final QuerySelector querySelector;
+    protected final SchemaSpec schema;
+    protected final OpSelectors.MonotonicClock clock;
+    protected final OpSelectors.DescriptorSelector descriptorSelector;
+    protected final QueryGenerator rangeSelector;
+    protected final MetricReporter metricReporter;
 
-    public DefaultRowVisitor(SchemaSpec schema,
-                             OpSelectors.MonotonicClock clock,
-                             OpSelectors.DescriptorSelector descriptorSelector,
-                             QuerySelector querySelector)
+    public MutatingRowVisitor(Run run)
     {
-        this.schema = schema;
-        this.clock = clock;
-        this.descriptorSelector = descriptorSelector;
-        this.querySelector = querySelector;
+        this.metricReporter = run.metricReporter;
+        this.schema = run.schemaSpec;
+        this.clock = run.clock;
+        this.descriptorSelector = run.descriptorSelector;
+        this.rangeSelector = run.rangeSelector;
     }
 
     public CompiledStatement write(long lts, long pd, long cd, long opId)
     {
+        metricReporter.insert();
         long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
         return WriteHelper.inflateInsert(schema, pd, cd, vds, clock.rts(lts));
     }
 
     public CompiledStatement deleteColumn(long lts, long pd, long cd, long opId)
     {
+        metricReporter.columnDelete();
         BitSet mask = descriptorSelector.columnMask(pd, lts, opId);
         return DeleteHelper.deleteColumn(schema, pd, cd, mask, clock.rts(lts));
     }
@@ -58,11 +61,19 @@ public class DefaultRowVisitor implements RowVisitor
 
     public CompiledStatement deleteRow(long lts, long pd, long cd, long opId)
     {
+        metricReporter.rowDelete();
         return DeleteHelper.deleteRow(schema, pd, cd, clock.rts(lts));
     }
 
     public CompiledStatement deleteRange(long lts, long pd, long opId)
     {
-        return querySelector.inflate(lts, opId).toDeleteStatement(clock.rts(lts));
+        metricReporter.rangeDelete();
+        return rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_RANGE).toDeleteStatement(clock.rts(lts));
+    }
+
+    public CompiledStatement deleteSlice(long lts, long pd, long opId)
+    {
+        metricReporter.rangeDelete();
+        return rangeSelector.inflate(lts, opId, Query.QueryKind.CLUSTERING_SLICE).toDeleteStatement(clock.rts(lts));
     }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/PartitionVisitor.java b/harry-core/src/harry/runner/PartitionVisitor.java
index ef861fd..85e3b26 100644
--- a/harry-core/src/harry/runner/PartitionVisitor.java
+++ b/harry-core/src/harry/runner/PartitionVisitor.java
@@ -18,9 +18,14 @@
 
 package harry.runner;
 
+import harry.core.Run;
+
 public interface PartitionVisitor
 {
     void visitPartition(long lts);
-
-    void shutdown();
-}
+    public void shutdown() throws InterruptedException;
+    public interface PartitionVisitorFactory
+    {
+        public PartitionVisitor make(Run run);
+    }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/QueryGenerator.java b/harry-core/src/harry/runner/QueryGenerator.java
new file mode 100644
index 0000000..a50c450
--- /dev/null
+++ b/harry-core/src/harry/runner/QueryGenerator.java
@@ -0,0 +1,364 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.LongSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Run;
+import harry.ddl.ColumnSpec;
+import harry.ddl.SchemaSpec;
+import harry.generators.DataGenerators;
+import harry.generators.RngUtils;
+import harry.generators.Surjections;
+import harry.model.OpSelectors;
+import harry.operations.Relation;
+
+// TODO: there's a lot of potential to reduce an amount of garbage here.
+// TODO: refactor. Currently, this class is a base for both SELECT and DELETE statements. In retrospect,
+//       a better way to do the same thing would've been to just inflate bounds, be able to inflate
+//       any type of query from the bounds, and leave things like "reverse" up to the last mile / implementation.
+public class QueryGenerator
+{
+    private static final Logger logger = LoggerFactory.getLogger(QueryGenerator.class);
+    private static final long GT_STREAM = 0b1;
+    private static final long E_STREAM = 0b10;
+
+    private final OpSelectors.Rng rng;
+    private final OpSelectors.PdSelector pdSelector;
+    private final OpSelectors.DescriptorSelector descriptorSelector;
+    private final SchemaSpec schema;
+
+    public QueryGenerator(Run run)
+    {
+        this(run.schemaSpec, run.pdSelector, run.descriptorSelector, run.rng);
+    }
+
+    // TODO: remove constructor
+    public QueryGenerator(SchemaSpec schema,
+                          OpSelectors.PdSelector pdSelector,
+                          OpSelectors.DescriptorSelector descriptorSelector,
+                          OpSelectors.Rng rng)
+    {
+        this.pdSelector = pdSelector;
+        this.descriptorSelector = descriptorSelector;
+        this.schema = schema;
+        this.rng = rng;
+    }
+
+    public static class TypedQueryGenerator
+    {
+        private final OpSelectors.Rng rng;
+        private final QueryGenerator queryGenerator;
+        private final Surjections.Surjection<Query.QueryKind> queryKindGen;
+
+        public TypedQueryGenerator(Run run)
+        {
+            this(run.rng, new QueryGenerator(run));
+        }
+
+        public TypedQueryGenerator(OpSelectors.Rng rng,
+                                   QueryGenerator queryGenerator)
+        {
+            this(rng, Surjections.enumValues(Query.QueryKind.class), queryGenerator);
+        }
+
+        public TypedQueryGenerator(OpSelectors.Rng rng,
+                                   Surjections.Surjection<Query.QueryKind> queryKindGen,
+                                   QueryGenerator queryGenerator)
+        {
+            this.rng = rng;
+            this.queryGenerator = queryGenerator;
+            this.queryKindGen = queryKindGen;
+        }
+
+        // Queries are inflated from LTS, which identifies the partition, and i, a modifier for the query to
+        // be able to generate different queries for the same lts.
+        public Query inflate(long lts, long modifier)
+        {
+            long descriptor = rng.next(modifier, lts);
+            Query.QueryKind queryKind = queryKindGen.inflate(descriptor);
+            return queryGenerator.inflate(lts, modifier, queryKind);
+        }
+    }
+
+    public Query inflate(long lts, long modifier, Query.QueryKind queryKind)
+    {
+        long pd = pdSelector.pd(lts, schema);
+        long descriptor = rng.next(modifier, lts);
+        boolean reverse = descriptor % 2 == 0;
+        switch (queryKind)
+        {
+            case SINGLE_PARTITION:
+                return new Query.SinglePartitionQuery(queryKind,
+                                                      pd,
+                                                      reverse,
+                                                      Collections.emptyList(),
+                                                      schema);
+            case SINGLE_CLUSTERING:
+            {
+                long cd = descriptorSelector.randomCd(pd, descriptor, schema);
+                return new Query.SingleClusteringQuery(queryKind,
+                                                       pd,
+                                                       cd,
+                                                       reverse,
+                                                       Relation.eqRelations(schema.ckGenerator.slice(cd), schema.clusteringKeys),
+                                                       schema);
+            }
+            case CLUSTERING_SLICE:
+            {
+                List<Relation> relations = new ArrayList<>();
+                long cd = descriptorSelector.randomCd(pd, descriptor, schema);
+                boolean isGt = RngUtils.asBoolean(rng.next(descriptor, GT_STREAM));
+                // TODO: make generation of EQ configurable; turn it off and on
+                boolean isEquals = RngUtils.asBoolean(rng.next(descriptor, E_STREAM));
+
+                long[] sliced = schema.ckGenerator.slice(cd);
+                long min;
+                long max;
+                int nonEqFrom = RngUtils.asInt(descriptor, 0, sliced.length - 1);
+
+                long[] minBound = new long[sliced.length];
+                long[] maxBound = new long[sliced.length];
+
+                // Algorithm that determines boundaries for a clustering slice.
+                //
+                // Basic principles are not hard but there are a few edge cases. I haven't figured out how to simplify
+                // those, so there might be some room for improvement. In short, what we want to achieve is:
+                //
+                // 1. Every part that is restricted with an EQ relation goes into the bound verbatim.
+                // 2. Every part that is restricted with a non-EQ relation (LT, GT, LTE, GTE) is taken into the bound
+                //    if it is required to satisfy the relationship. For example, in `ck1 = 0 AND ck2 < 5`, ck2 will go
+                //    to the _max_ boundary, and minimum value will go to the _min_ boundary, since we can select every
+                //    descriptor that is prefixed with ck1.
+                // 3. Every other part (e.g., ones that are not explicitly mentioned in the query) has to be restricted
+                //    according to equality. For example, in `ck1 = 0 AND ck2 < 5`, ck3 that is present in schema but not
+                //    mentioned in query, makes sure that any value between [0, min_value, min_value] and [0, 5, min_value]
+                //    is matched.
+                //
+                // One edge case is a query on the first clustering key: `ck1 < 5`. In this case, we have to fixup the lower
+                // value to the minimum possible value. We could really just do Long.MIN_VALUE, but in case we forget to
+                // adjust entropy elsewhere, it'll be caught correctly here.
+                for (int i = 0; i < sliced.length; i++)
+                {
+                    long v = sliced[i];
+                    DataGenerators.KeyGenerator gen = schema.ckGenerator;
+                    ColumnSpec column = schema.clusteringKeys.get(i);
+                    int idx = i;
+                    LongSupplier maxSupplier = () -> gen.maxValue(idx);
+                    LongSupplier minSupplier = () -> gen.minValue(idx);
+
+                    if (i < nonEqFrom)
+                    {
+                        relations.add(Relation.eqRelation(schema.clusteringKeys.get(i), v));
+                        minBound[i] = v;
+                        maxBound[i] = v;
+                    }
+                    else if (i == nonEqFrom)
+                    {
+                        relations.add(Relation.relation(relationKind(isGt, isEquals), schema.clusteringKeys.get(i), v));
+
+                        if (column.isReversed())
+                        {
+                            minBound[i] = isGt ? minSupplier.getAsLong() : v;
+                            maxBound[i] = isGt ? v : maxSupplier.getAsLong();
+                        }
+                        else
+                        {
+                            minBound[i] = isGt ? v : minSupplier.getAsLong();
+                            maxBound[i] = isGt ? maxSupplier.getAsLong() : v;
+                        }
+                    }
+                    else
+                    {
+                        if (isEquals)
+                        {
+                            minBound[i] = minSupplier.getAsLong();
+                            maxBound[i] = maxSupplier.getAsLong();
+                        }
+                        else if (i > 0 && schema.clusteringKeys.get(i - 1).isReversed())
+                            maxBound[i] = minBound[i] = isGt ? minSupplier.getAsLong() : maxSupplier.getAsLong();
+                        else
+                            maxBound[i] = minBound[i] = isGt ? maxSupplier.getAsLong() : minSupplier.getAsLong();
+                    }
+                }
+
+                if (schema.clusteringKeys.get(nonEqFrom).isReversed())
+                    isGt = !isGt;
+
+                min = schema.ckGenerator.stitch(minBound);
+                max = schema.ckGenerator.stitch(maxBound);
+
+                if (nonEqFrom == 0)
+                {
+                    min = isGt ? min : schema.ckGenerator.minValue();
+                    max = !isGt ? max : schema.ckGenerator.maxValue();
+                }
+
+                // if we're about to create an "impossible" query, just bump the modifier and re-generate
+                if (min == max && !isEquals)
+                    return inflate(lts, modifier + 1, queryKind);
+
+                return new Query.ClusteringSliceQuery(Query.QueryKind.CLUSTERING_SLICE,
+                                                      pd,
+                                                      min,
+                                                      max,
+                                                      relationKind(true, isGt ? isEquals : true),
+                                                      relationKind(false, !isGt ? isEquals : true),
+                                                      reverse,
+                                                      relations,
+                                                      schema);
+            }
+            case CLUSTERING_RANGE:
+            {
+                List<Relation> relations = new ArrayList<>();
+                long cd1 = descriptorSelector.randomCd(pd, descriptor, schema);
+                boolean isMinEq = RngUtils.asBoolean(descriptor);
+                long cd2 = descriptorSelector.randomCd(pd, rng.next(descriptor, lts), schema);
+
+                boolean isMaxEq = RngUtils.asBoolean(rng.next(descriptor, lts));
+
+                long[] minBound = schema.ckGenerator.slice(cd1);
+                long[] maxBound = schema.ckGenerator.slice(cd2);
+
+                int lock = RngUtils.asInt(descriptor, 0, schema.clusteringKeys.size() - 1);
+
+                // Logic here is similar to how clustering slices are implemented, except for both lower and upper bound
+                // get their values from sliced value in (1) and (2) cases:
+                //
+                // 1. Every part that is restricted with an EQ relation, takes its value from the min bound.
+                //    TODO: this can actually be improved, since in case of hierarchical clustering generation we can
+                //          pick out of the keys that are already locked. That said, we'll exercise more cases the way
+                //          it is implemented right now.
+                // 2. Every part that is restricted with a non-EQ relation is taken into the bound, if it is used in
+                //    the query. For example in, `ck1 = 0 AND ck2 > 2 AND ck2 < 5`, ck2 values 2 and 5 will be placed,
+                //    correspondingly, to the min and max bound.
+                // 3. Every other part has to be restricted according to equality. Similar to clustering slice, we have
+                //    to decide whether we use a min or the max value for the bound. Foe example `ck1 = 0 AND ck2 > 2 AND ck2 <= 5`,
+                //    assuming we have ck3 that is present in schema but not mentioned in the query, we'll have bounds
+                //    created as follows: [0, 2, max_value] and [0, 5, max_value]. Idea here is that since ck2 = 2 is excluded,
+                //    we also disallow all ck3 values for [0, 2] prefix. Similarly, since ck2 = 5 is included, we allow every
+                //    ck3 value with a prefix of [0, 5].
+                for (int i = 0; i < schema.clusteringKeys.size(); i++)
+                {
+                    ColumnSpec<?> col = schema.clusteringKeys.get(i);
+                    if (i < lock)
+                    {
+                        relations.add(Relation.eqRelation(col, minBound[i]));
+                        maxBound[i] = minBound[i];
+                    }
+                    else if (i == lock)
+                    {
+                        long minLocked = Math.min(minBound[lock], maxBound[lock]);
+                        long maxLocked = Math.max(minBound[lock], maxBound[lock]);
+
+                        relations.add(Relation.relation(relationKind(true, isMinEq), col, minLocked));
+                        minBound[i] = col.isReversed() ? maxLocked : minLocked;
+                        relations.add(Relation.relation(relationKind(false, isMaxEq), col, maxLocked));
+                        maxBound[i] = col.isReversed() ? minLocked : maxLocked;
+                    }
+                    else
+                    {
+//                        if (i > 0 && schema.clusteringKeys.get(i - 1).isReversed())
+//                        {
+//                            minBound[i] = isMinEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i);
+//                            maxBound[i] = isMaxEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i);
+//                        }
+//                        else
+                        {
+                            minBound[i] = isMinEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i);
+                            maxBound[i] = isMaxEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i);
+                        }
+                    }
+                }
+
+                long stitchedMin = schema.ckGenerator.stitch(minBound);
+                long stitchedMax = schema.ckGenerator.stitch(maxBound);
+
+//                if (stitchedMin > stitchedMax)
+//                {
+//                    long[] tmp = minBound;
+//                    minBound = maxBound;
+//                    maxBound = tmp;
+//                    stitchedMin = schema.ckGenerator.stitch(minBound);
+//                    stitchedMax = schema.ckGenerator.stitch(maxBound);
+//                }
+//
+//                for (int i = 0; i <= lock; i++)
+//                {
+//                    ColumnSpec<?> col = schema.clusteringKeys.get(i);
+//                    if (i < lock)
+//                    {
+//                        relations.add(Relation.eqRelation(col, minBound[i]));
+//                    }
+//                    else
+//                    {
+//                        relations.add(Relation.relation(relationKind(true, isMinEq), col, minBound[lock]));
+//                        relations.add(Relation.relation(relationKind(false, isMaxEq), col, maxBound[lock]));
+//                    }
+//                }
+
+                // if we're about to create an "impossible" query, just bump the modifier and re-generate
+                // TODO: so this isn't considered "normal" that we do it this way, but I'd rather fix it with
+                //       a refactoring that's mentioned below
+                if (stitchedMin == stitchedMax)
+                {
+//                    if (modifier > 10)
+//                    {
+//                        logger.error(String.format("Unsuccessfully tried to generate query for %s%s;%s%s %s %d times. Schema: %s",
+//                                                   isMinEq ? "[" : "(", stitchedMin,
+//                                                   stitchedMax, isMaxEq ? "]" : ")",
+//                                                   queryKind, modifier, schema.compile().cql()),
+//                                     new RuntimeException());
+//                    }
+                    return inflate(lts, modifier + 1, queryKind);
+                }
+
+                // TODO: one of the ways to get rid of garbage here, and potentially even simplify the code is to
+                //       simply return bounds here. After bounds are created, we slice them and generate query right
+                //       from the bounds. In this case, we can even say that things like -inf/+inf are special values,
+                //       and use them as placeholdrs. Also, it'll be easier to manipulate relations.
+                return new Query.ClusteringRangeQuery(Query.QueryKind.CLUSTERING_RANGE,
+                                                      pd,
+                                                      stitchedMin,
+                                                      stitchedMax,
+                                                      relationKind(true, isMinEq),
+                                                      relationKind(false, isMaxEq),
+                                                      reverse,
+                                                      relations,
+                                                      schema);
+            }
+            default:
+                throw new IllegalArgumentException("Shouldn't happen");
+        }
+    }
+
+    public static Relation.RelationKind relationKind(boolean isGt, boolean isEquals)
+    {
+        if (isGt)
+            return isEquals ? Relation.RelationKind.GTE : Relation.RelationKind.GT;
+        else
+            return isEquals ? Relation.RelationKind.LTE : Relation.RelationKind.LT;
+    }
+}
diff --git a/harry-core/src/harry/runner/RecentPartitionValidator.java b/harry-core/src/harry/runner/RecentPartitionValidator.java
new file mode 100644
index 0000000..82d4bda
--- /dev/null
+++ b/harry-core/src/harry/runner/RecentPartitionValidator.java
@@ -0,0 +1,84 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import harry.core.MetricReporter;
+import harry.core.Run;
+import harry.generators.Surjections;
+import harry.model.Model;
+import harry.model.OpSelectors;
+
+public class RecentPartitionValidator implements PartitionVisitor
+{
+    private final Model model;
+
+    private final OpSelectors.MonotonicClock clock;
+    private final OpSelectors.PdSelector pdSelector;
+    private final QueryGenerator.TypedQueryGenerator querySelector;
+    private final MetricReporter metricReporter;
+    private final int partitionCount;
+    private final int triggerAfter;
+
+    public RecentPartitionValidator(int partitionCount,
+                                    int triggerAfter,
+                                    Run run,
+                                    Model.ModelFactory modelFactory)
+    {
+        this.partitionCount = partitionCount;
+        this.triggerAfter = triggerAfter;
+        this.metricReporter = run.metricReporter;
+        this.clock = run.clock;
+        this.pdSelector = run.pdSelector;
+
+        this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
+                                                                    // TODO: make query kind configurable
+                                                                    Surjections.enumValues(Query.QueryKind.class),
+                                                                    run.rangeSelector);
+        this.model = modelFactory.make(run);
+    }
+
+    // TODO: expose metric, how many times validated recent partitions
+    private void validateRecentPartitions(int partitionCount)
+    {
+        long maxLts = clock.maxLts();
+        long pos = pdSelector.positionFor(maxLts);
+
+        int maxPartitions = partitionCount;
+        while (pos > 0 && maxPartitions > 0 && !Thread.currentThread().isInterrupted())
+        {
+            long visitLts = pdSelector.minLtsAt(pos);
+
+            metricReporter.validateRandomQuery();
+            model.validate(querySelector.inflate(visitLts, 0));
+
+            pos--;
+            maxPartitions--;
+        }
+    }
+
+    public void visitPartition(long lts)
+    {
+        if (lts % triggerAfter == 0)
+            validateRecentPartitions(partitionCount);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+    }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/RowVisitor.java b/harry-core/src/harry/runner/RowVisitor.java
index 93674f9..b24babb 100644
--- a/harry-core/src/harry/runner/RowVisitor.java
+++ b/harry-core/src/harry/runner/RowVisitor.java
@@ -18,7 +18,7 @@
 
 package harry.runner;
 
-import harry.ddl.SchemaSpec;
+import harry.core.Run;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
 
@@ -26,10 +26,7 @@ public interface RowVisitor
 {
     interface RowVisitorFactory
     {
-        RowVisitor make(SchemaSpec schema,
-                        OpSelectors.MonotonicClock clock,
-                        OpSelectors.DescriptorSelector descriptorSelector,
-                        QuerySelector querySelector);
+        RowVisitor make(Run run);
     }
 
     default CompiledStatement visitRow(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
@@ -46,6 +43,8 @@ public interface RowVisitor
                 return deleteColumn(lts, pd, cd, opId);
             case DELETE_RANGE:
                 return deleteRange(lts, pd, opId);
+            case DELETE_SLICE:
+                return deleteSlice(lts, pd, opId);
             default:
                 throw new IllegalStateException();
         }
@@ -58,4 +57,8 @@ public interface RowVisitor
     CompiledStatement deleteRow(long lts, long pd, long cd, long opId);
 
     CompiledStatement deleteRange(long lts, long pd, long opId);
+
+    CompiledStatement deleteSlice(long lts, long pd, long opId);
+
+
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index 7f1696e..a96dd69 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -23,11 +23,10 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -43,8 +42,6 @@ import harry.model.OpSelectors;
 
 public abstract class Runner
 {
-    public static final Object[] EMPTY_BINDINGS = {};
-
     private static final Logger logger = LoggerFactory.getLogger(Runner.class);
 
     protected final Run run;
@@ -53,10 +50,10 @@ public abstract class Runner
     //  since we have multiple concurrent checkers running
     protected final CopyOnWriteArrayList<Throwable> errors;
 
-    public Runner(Run run)
+    public Runner(Run run, Configuration config)
     {
         this.run = run;
-        this.config = run.snapshot;
+        this.config = config;
         this.errors = new CopyOnWriteArrayList<>();
     }
 
@@ -70,12 +67,11 @@ public abstract class Runner
         if (config.create_schema)
         {
             // TODO: make RF configurable or make keyspace DDL configurable
-            run.sut.execute("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            run.sut.schemaChange("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
 
-            run.sut.execute(String.format("DROP TABLE IF EXISTS %s.%s;",
+            run.sut.schemaChange(String.format("DROP TABLE IF EXISTS %s.%s;",
                                           run.schemaSpec.keyspace,
-                                          run.schemaSpec.table),
-                            EMPTY_BINDINGS);
+                                          run.schemaSpec.table));
             String schema = run.schemaSpec.compile().cql();
             logger.info("Creating table: " + schema);
             run.sut.schemaChange(schema);
@@ -83,10 +79,9 @@ public abstract class Runner
 
         if (config.truncate_table)
         {
-            run.sut.execute(String.format("truncate %s.%s;",
-                                          run.schemaSpec.keyspace,
-                                          run.schemaSpec.table),
-                            EMPTY_BINDINGS);
+            run.sut.schemaChange(String.format("truncate %s.%s;",
+                                               run.schemaSpec.keyspace,
+                                               run.schemaSpec.table));
         }
     }
 
@@ -112,7 +107,7 @@ public abstract class Runner
     protected void maybeReportErrors()
     {
         if (!errors.isEmpty())
-            dumpStateToFile(run, errors);
+            dumpStateToFile(run, config, errors);
     }
 
     public abstract CompletableFuture initAndStartAll() throws InterruptedException;
@@ -140,20 +135,21 @@ public abstract class Runner
     {
         private final ScheduledExecutorService executor;
         private final ScheduledExecutorService shutdownExceutor;
-        private final int checkRecentAfter;
-        private final int checkAllAfter;
+        private final List<PartitionVisitor> partitionVisitors;
+        private final Configuration config;
 
         public SequentialRunner(Run run,
-                                int roundRobinValidatorThreads,
-                                int checkRecentAfter,
-                                int checkAllAfter)
+                                Configuration config,
+                                List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
         {
-            super(run);
+            super(run, config);
 
-            this.executor = Executors.newScheduledThreadPool(roundRobinValidatorThreads + 1);
+            this.executor = Executors.newSingleThreadScheduledExecutor();
             this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
-            this.checkAllAfter = checkAllAfter;
-            this.checkRecentAfter = checkRecentAfter;
+            this.config = config;
+            this.partitionVisitors = new ArrayList<>();
+            for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
+                partitionVisitors.add(factory.make(run));
         }
 
         public CompletableFuture initAndStartAll()
@@ -166,12 +162,12 @@ public abstract class Runner
                 logger.info("Completed");
                 // TODO: wait for the last full validation?
                 future.complete(null);
-            }, run.snapshot.run_time, run.snapshot.run_time_unit);
+            }, config.run_time, config.run_time_unit);
 
             executor.submit(reportThrowable(() -> {
                                                 try
                                                 {
-                                                    run(run.visitorFactory.get(), run.clock,
+                                                    run(partitionVisitors, run.clock,
                                                         () -> Thread.currentThread().isInterrupted() || future.isDone());
                                                 }
                                                 catch (Throwable t)
@@ -184,19 +180,15 @@ public abstract class Runner
             return future;
         }
 
-        void run(PartitionVisitor visitor,
+        void run(List<PartitionVisitor> visitors,
                  OpSelectors.MonotonicClock clock,
-                 BooleanSupplier exitCondition) throws ExecutionException, InterruptedException
+                 BooleanSupplier exitCondition)
         {
             while (!exitCondition.getAsBoolean())
             {
                 long lts = clock.nextLts();
-                visitor.visitPartition(lts);
-                if (lts % checkRecentAfter == 0)
-                    run.validator.validateRecentPartitions(100);
-
-                if (lts % checkAllAfter == 0)
-                    run.validator.validateAllPartitions(executor, exitCondition, 10).get();
+                for (PartitionVisitor partitionVisitor : visitors)
+                    partitionVisitor.visitPartition(lts);
             }
         }
 
@@ -215,105 +207,89 @@ public abstract class Runner
 
     public static interface RunnerFactory
     {
-        public Runner make(Run run);
+        public Runner make(Run run, Configuration config);
     }
 
     // TODO: this requires some significant improvement
     public static class ConcurrentRunner extends Runner
     {
         private final ScheduledExecutorService executor;
-        private final ScheduledExecutorService shutdownExceutor;
+        private final ScheduledExecutorService shutdownExecutor;
+        private final List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories;
+        private final List<PartitionVisitor> allVisitors;
 
-        private final int writerThreads;
-        private final int roundRobinValidators;
-        private final int recentPartitionValidators;
+        private final int concurrency;
         private final long runTime;
         private final TimeUnit runTimeUnit;
 
         public ConcurrentRunner(Run run,
-                                int writerThreads,
-                                int roundRobinValidators,
-                                int recentPartitionValidators)
+                                Configuration config,
+                                int concurrency,
+                                List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
         {
-            super(run);
-            this.writerThreads = writerThreads;
-            this.roundRobinValidators = roundRobinValidators;
-            this.recentPartitionValidators = recentPartitionValidators;
-            this.runTime = run.snapshot.run_time;
-            this.runTimeUnit = run.snapshot.run_time_unit;
-            this.executor = Executors.newScheduledThreadPool(this.writerThreads + this.roundRobinValidators + this.recentPartitionValidators);
-            this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
+            super(run, config);
+            this.concurrency = concurrency;
+            this.runTime = config.run_time;
+            this.runTimeUnit = config.run_time_unit;
+            // TODO: configure concurrency
+            this.executor = Executors.newScheduledThreadPool(concurrency);
+            this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
+            this.partitionVisitorFactories = partitionVisitorFactories;
+            this.allVisitors = new CopyOnWriteArrayList<>();
         }
 
-        public CompletableFuture initAndStartAll() throws InterruptedException
+        public CompletableFuture initAndStartAll()
         {
             init();
             CompletableFuture future = new CompletableFuture();
             future.whenComplete((a, b) -> maybeReportErrors());
 
-            shutdownExceutor.schedule(() -> {
+            shutdownExecutor.schedule(() -> {
                 logger.info("Completed");
                 // TODO: wait for the last full validation?
                 future.complete(null);
             }, runTime, runTimeUnit);
 
             BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
-            for (int i = 0; i < writerThreads; i++)
+            for (int i = 0; i < concurrency; i++)
             {
-                executor.submit(reportThrowable(() -> run(run.visitorFactory.get(), run.clock, exitCondition),
+                List<PartitionVisitor> partitionVisitors = new ArrayList<>();
+                executor.submit(reportThrowable(() -> {
+
+                                                    for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
+                                                    {
+                                                        partitionVisitors.add(factory.make(run));
+                                                    }
+                                                    allVisitors.addAll(partitionVisitors);
+                                                    run(partitionVisitors, run.clock, exitCondition);
+                                                },
                                                 future));
-            }
-
-            scheduleValidateAllPartitions(run.validator, executor, future, roundRobinValidators);
 
-            // N threads to validate recently written partitions
-            for (int i = 0; i < recentPartitionValidators; i++)
-            {
-                executor.scheduleWithFixedDelay(reportThrowable(() -> {
-                    // TODO: make recent partitions configurable
-                    run.validator.validateRecentPartitions(100);
-                }, future), 1000, 1, TimeUnit.MILLISECONDS);
             }
 
             return future;
         }
 
-        void run(PartitionVisitor visitor,
+        void run(List<PartitionVisitor> visitors,
                  OpSelectors.MonotonicClock clock,
                  BooleanSupplier exitCondition)
         {
             while (!exitCondition.getAsBoolean())
             {
                 long lts = clock.nextLts();
-                visitor.visitPartition(lts);
+                for (PartitionVisitor visitor : visitors)
+                    visitor.visitPartition(lts);
             }
         }
 
-        void scheduleValidateAllPartitions(Validator validator, ExecutorService executor, CompletableFuture future, int roundRobinValidators)
-        {
-            validator.validateAllPartitions(executor, () -> Thread.currentThread().isInterrupted() || future.isDone(),
-                                            roundRobinValidators)
-                     .handle((v, err) -> {
-                         if (err != null)
-                         {
-                             errors.add(err);
-                             if (!future.isDone())
-                                 future.completeExceptionally(err);
-                         }
-
-                         if (Thread.currentThread().isInterrupted() || future.isDone())
-                             return null;
-
-                         scheduleValidateAllPartitions(validator, executor, future, roundRobinValidators);
-                         return null;
-                     });
-        }
-
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            shutdownExceutor.shutdownNow();
-            shutdownExceutor.awaitTermination(1, TimeUnit.MINUTES);
+            for (PartitionVisitor visitor : allVisitors)
+                visitor.shutdown();
+
+            shutdownExecutor.shutdownNow();
+            shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
 
             executor.shutdownNow();
             executor.awaitTermination(1, TimeUnit.MINUTES);
@@ -346,7 +322,7 @@ public abstract class Runner
         bw.newLine();
     }
 
-    private static void dumpStateToFile(Run run, List<Throwable> t)
+    private static void dumpStateToFile(Run run, Configuration config, List<Throwable> t)
     {
         try
         {
@@ -361,14 +337,14 @@ public abstract class Runner
                 bw.flush();
             }
 
-            File config = new File("run.yaml");
-            Configuration.ConfigurationBuilder builder = run.snapshot.unbuild();
+            File file = new File("run.yaml");
+            Configuration.ConfigurationBuilder builder = config.unbuild();
 
             // overrride stateful components
             builder.setClock(run.clock.toConfig());
-            builder.setModel(run.model.toConfig());
+            builder.setDataTracker(run.tracker.toConfig());
 
-            try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(config))))
+            try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))))
             {
                 bw.write(Configuration.toYamlString(builder.build()));
                 bw.flush();
diff --git a/harry-core/src/harry/runner/SinglePartitionValidator.java b/harry-core/src/harry/runner/SinglePartitionValidator.java
new file mode 100644
index 0000000..e9d4f27
--- /dev/null
+++ b/harry-core/src/harry/runner/SinglePartitionValidator.java
@@ -0,0 +1,56 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import harry.core.Run;
+import harry.model.Model;
+
+public class SinglePartitionValidator implements PartitionVisitor
+{
+    protected final int iterations;
+    protected final Model model;
+    protected final QueryGenerator queryGenerator;
+
+    public SinglePartitionValidator(int iterations,
+                                    Run run,
+                                    Model.ModelFactory modelFactory)
+    {
+        this.iterations = iterations;
+        this.model = modelFactory.make(run);
+        this.queryGenerator = new QueryGenerator(run);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+
+    }
+
+    public void visitPartition(long lts)
+    {
+        model.validate(queryGenerator.inflate(lts, 0, Query.QueryKind.SINGLE_PARTITION));
+
+        for (Query.QueryKind queryKind : new Query.QueryKind[]{ Query.QueryKind.CLUSTERING_RANGE, Query.QueryKind.CLUSTERING_SLICE, Query.QueryKind.SINGLE_CLUSTERING })
+        {
+            for (int i = 0; i < iterations; i++)
+            {
+                model.validate(queryGenerator.inflate(lts, i, queryKind));
+            }
+        }
+    }
+}
diff --git a/harry-core/src/harry/runner/Validator.java b/harry-core/src/harry/runner/Validator.java
deleted file mode 100644
index 4bd1d90..0000000
--- a/harry-core/src/harry/runner/Validator.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.runner;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BooleanSupplier;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.ddl.SchemaSpec;
-import harry.generators.Surjections;
-import harry.model.Model;
-import harry.model.OpSelectors;
-
-// This might be something that potentially grows into the validator described in the design doc;
-// right now it's just a helper/container class
-public class Validator
-{
-    private static final Logger logger = LoggerFactory.getLogger(Validator.class);
-
-    private final Model model;
-    private final SchemaSpec schema;
-
-    private final OpSelectors.MonotonicClock clock;
-    private final OpSelectors.PdSelector pdSelector;
-    private final QuerySelector querySelector;
-
-    public Validator(Model model,
-                     SchemaSpec schema,
-                     OpSelectors.MonotonicClock clock,
-                     OpSelectors.PdSelector pdSelector,
-                     OpSelectors.DescriptorSelector descriptorSelector,
-                     OpSelectors.Rng rng)
-    {
-        this.model = model;
-        this.schema = schema;
-        this.clock = clock;
-        this.pdSelector = pdSelector;
-        this.querySelector = new QuerySelector(schema,
-                                               pdSelector,
-                                               descriptorSelector,
-                                               Surjections.enumValues(Query.QueryKind.class),
-                                               rng);
-    }
-
-    // TODO: expose metric, how many times validated recent partitions
-    public void validateRecentPartitions(int partitionCount)
-    {
-        long maxLts = clock.maxLts();
-        long pos = pdSelector.positionFor(maxLts);
-
-        int maxPartitions = partitionCount;
-        while (pos > 0 && maxPartitions > 0 && !Thread.currentThread().isInterrupted())
-        {
-            long visitLts = pdSelector.minLtsAt(pos);
-
-            model.validatePartitionState(visitLts,
-                                         querySelector.inflate(visitLts, 0));
-
-            pos--;
-            maxPartitions--;
-        }
-    }
-
-    public CompletableFuture<Void> validateAllPartitions(ExecutorService executor, BooleanSupplier keepRunning, int parallelism)
-    {
-        long maxLts = clock.maxLts() - 1;
-        long maxPos = pdSelector.positionFor(maxLts);
-        AtomicLong counter = new AtomicLong();
-        CompletableFuture[] futures = new CompletableFuture[parallelism];
-        for (int i = 0; i < parallelism; i++)
-        {
-            futures[i] = CompletableFuture.supplyAsync(() -> {
-                long pos;
-                while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && keepRunning.getAsBoolean())
-                {
-                    if (pos > 0 && pos % 1000 == 0)
-                        logger.debug(String.format("Validated %d out of %d partitions", pos, maxPos));
-                    long visitLts = pdSelector.minLtsAt(pos);
-                    for (boolean reverse : new boolean[]{ true, false })
-                    {
-                        model.validatePartitionState(visitLts,
-                                                     Query.selectPartition(schema, pdSelector.pd(visitLts, schema), reverse));
-                    }
-                }
-                return null;
-            }, executor);
-        }
-        return CompletableFuture.allOf(futures);
-    }
-
-    public void validatePartition(long lts)
-    {
-        for (boolean reverse : new boolean[]{ true, false })
-        {
-            model.validatePartitionState(lts,
-                                         Query.selectPartition(schema, pdSelector.pd(lts, schema), reverse));
-        }
-    }
-}
diff --git a/harry-core/src/harry/util/Ranges.java b/harry-core/src/harry/util/Ranges.java
index a49b2bd..9da2af8 100644
--- a/harry-core/src/harry/util/Ranges.java
+++ b/harry-core/src/harry/util/Ranges.java
@@ -84,7 +84,8 @@ public class Ranges
 
         public Range(long minBound, long maxBound, boolean minInclusive, boolean maxInclusive, long timestamp)
         {
-            assert (minBound < maxBound) || ((minBound == maxBound) && minInclusive && maxInclusive) :
+            // (minBound < maxBound) ||
+            assert ((minBound != maxBound) || (minInclusive && maxInclusive)) :
             String.format("Min bound should be less than max bound, or both bounds have to be inclusive, but was: %s%d,%d%s",
                           minInclusive ? "[" : "(",
                           minBound, maxBound,
@@ -99,12 +100,24 @@ public class Ranges
 
         public boolean contains(long descriptor)
         {
-            if (minInclusive && descriptor == minBound)
-                return true;
-            if (maxInclusive && descriptor == maxBound)
-                return true;
+            if (minInclusive && maxInclusive)
+                return descriptor >= minBound && descriptor <= maxBound;
 
-            return (descriptor > minBound) && (descriptor < maxBound);
+            if (!minInclusive && !maxInclusive)
+                return descriptor > minBound && descriptor < maxBound;
+
+            if (!minInclusive && maxInclusive)
+                return descriptor > minBound && descriptor <= maxBound;
+
+            assert (minInclusive && !maxInclusive);
+            return descriptor >= minBound && descriptor < maxBound;
+
+//            if ((minInclusive && descriptor == minBound) && descriptor < maxBound)
+//                return true;
+//            if ((maxInclusive && descriptor == maxBound) && descriptor > minBound)
+//                return true;
+//
+//            return (descriptor > minBound) && (descriptor < maxBound);
         }
 
         public boolean contains(long descriptor, long ts)
diff --git a/harry-core/test/harry/generators/RandomGeneratorTest.java b/harry-core/test/harry/generators/RandomGeneratorTest.java
index c84fd9b..b34553d 100644
--- a/harry-core/test/harry/generators/RandomGeneratorTest.java
+++ b/harry-core/test/harry/generators/RandomGeneratorTest.java
@@ -31,13 +31,14 @@ import static junit.framework.TestCase.fail;
 
 public class RandomGeneratorTest
 {
+    private static int RUNS = 100000;
+
     @Test
     public void testShuffleUnshuffle()
     {
-        int iterations = 100000;
         Random rnd = new Random();
 
-        for (int i = 1; i < iterations; i++)
+        for (int i = 1; i < RUNS; i++)
         {
             long l = rnd.nextLong();
             Assert.assertEquals(l, PCGFastPure.unshuffle(PCGFastPure.shuffle(l)));
@@ -48,23 +49,38 @@ public class RandomGeneratorTest
     public void testImmutableRng()
     {
         int size = 5;
-        for (int stream = 1; stream < 1000000; stream++)
+        OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
+        for (int stream = 1; stream < RUNS; stream++)
         {
             long[] generated = new long[size];
-            OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
             for (int i = 0; i < size; i++)
                 generated[i] = rng.randomNumber(i, stream);
 
+            Assert.assertEquals(0, rng.sequenceNumber(generated[0], stream));
+            Assert.assertEquals(generated[1], rng.next(generated[0], stream));
+
             for (int i = 1; i < size; i++)
             {
                 Assert.assertEquals(generated[i], rng.next(generated[i - 1], stream));
                 Assert.assertEquals(generated[i - 1], rng.prev(generated[i], stream));
-                Assert.assertEquals(i - 1, rng.sequenceNumber(generated[i], stream));
+                Assert.assertEquals(i, rng.sequenceNumber(generated[i], stream));
             }
         }
     }
 
     @Test
+    public void testSequenceNumber()
+    {
+        int size = 5;
+        OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
+        for (int stream = 1; stream < RUNS; stream++)
+        {
+            for (int i = 0; i < size; i++)
+                Assert.assertEquals(i, rng.sequenceNumber(rng.randomNumber(i, stream), stream));
+        }
+    }
+
+    @Test
     public void seekTest()
     {
         // TODO: more examples; randomize
@@ -81,14 +97,14 @@ public class RandomGeneratorTest
         Assert.assertEquals(last, rand.next());
         Assert.assertEquals(first, rand.nextAt(0));
         Assert.assertEquals(last, rand.nextAt(10));
-        Assert.assertEquals(-11, rand.distance(first));
+        Assert.assertEquals(-10, rand.distance(first));
     }
 
     @Test
     public void shuffleUnshuffleTest()
     {
         Random rnd = new Random();
-        for (int i = 0; i < 100000; i++)
+        for (int i = 0; i < RUNS; i++)
         {
             long a = rnd.nextLong();
             Assert.assertEquals(a, PCGFastPure.unshuffle(PCGFastPure.shuffle(a)));
@@ -103,7 +119,7 @@ public class RandomGeneratorTest
         int a = 0;
         int b = 50;
         int[] cardinality = new int[b - a];
-        for (int i = 0; i < 100000; i++)
+        for (int i = 0; i < RUNS; i++)
         {
             int min = Math.min(a, b);
             int max = Math.max(a, b);
diff --git a/harry-core/test/harry/generators/SurjectionsTest.java b/harry-core/test/harry/generators/SurjectionsTest.java
index b927041..5cc64c7 100644
--- a/harry-core/test/harry/generators/SurjectionsTest.java
+++ b/harry-core/test/harry/generators/SurjectionsTest.java
@@ -31,6 +31,8 @@ import harry.generators.Surjections;
 
 public class SurjectionsTest
 {
+    private static int RUNS = 1000000;
+
     @Test
     public void weightedTest()
     {
@@ -42,7 +44,7 @@ public class SurjectionsTest
         Map<String, Integer> frequencies = new HashMap<>();
         RandomGenerator rng = new PcgRSUFast(System.currentTimeMillis(), 0);
 
-        for (int i = 0; i < 1000000; i++)
+        for (int i = 0; i < RUNS; i++)
         {
             String s = gen.inflate(rng.next());
             frequencies.compute(s, (s1, i1) -> {
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 6c55a7f..d90fa5f 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -32,16 +32,18 @@ import java.util.function.Supplier;
 import org.junit.Assert;
 import org.junit.Test;
 
+import harry.core.MetricReporter;
+import harry.core.Run;
 import harry.ddl.ColumnSpec;
 import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
-import harry.generators.PcgRSUFast;
-import harry.generators.RandomGenerator;
 import harry.generators.Surjections;
 import harry.generators.distribution.Distribution;
-import harry.model.sut.NoOpSut;
+import harry.model.clock.OffsetClock;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
-import harry.runner.DefaultPartitionVisitorFactory;
+import harry.runner.DataTracker;
+import harry.runner.MutatingPartitionVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.RowVisitor;
 import harry.util.BitSet;
@@ -50,6 +52,8 @@ import static harry.model.OpSelectors.DefaultDescriptorSelector.DEFAULT_OP_TYPE_
 
 public class OpSelectorsTest
 {
+    private static int RUNS = 10000;
+
     @Test
     public void testRowDataDescriptorSupplier()
     {
@@ -69,7 +73,7 @@ public class OpSelectorsTest
                                                                               100,
                                                                               100);
 
-        for (int lts = 0; lts < 100000; lts++)
+        for (int lts = 0; lts < RUNS; lts++)
         {
             long pd = pdSupplier.pd(lts);
             for (int m = 0; m < descriptorSelector.numberOfModifications(lts); m++)
@@ -94,9 +98,9 @@ public class OpSelectorsTest
     public void pdSelectorTest()
     {
         OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
-        int cycles = 1000;
+        int cycles = 10000;
 
-        for (int repeats = 2; repeats <= 100; repeats++)
+        for (int repeats = 2; repeats <= 1000; repeats++)
         {
             for (int windowSize = 2; windowSize <= 10; windowSize++)
             {
@@ -104,7 +108,9 @@ public class OpSelectorsTest
                 long[] pds = new long[cycles];
                 for (int i = 0; i < cycles; i++)
                 {
-                    pds[i] = pdSupplier.pd(i);
+                    long pd = pdSupplier.pd(i);
+                    pds[i] = pd;
+                    Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
                 }
 
                 Set<Long> noNext = new HashSet<>();
@@ -150,7 +156,8 @@ public class OpSelectorsTest
 
                 for (int i = 0; i < cycles; i++)
                 {
-                    long maxLts = pdSupplier.maxLts(i);
+                    long pd = pdSupplier.pd(i);
+                    long maxLts = pdSupplier.maxLtsFor(pd);
                     Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
                     Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
                 }
@@ -161,8 +168,8 @@ public class OpSelectorsTest
     @Test
     public void ckSelectorTest()
     {
-        Supplier<SchemaSpec> gen = SchemaGenerators.progression(5);
-        for (int i = 0; i < 30; i++)
+        Supplier<SchemaSpec> gen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
             ckSelectorTest(gen.get());
     }
 
@@ -191,37 +198,48 @@ public class OpSelectorsTest
             });
         };
 
-        PartitionVisitor partitionVisitor = new DefaultPartitionVisitorFactory(new DoNothingModel(),
-                                                                               new NoOpSut(),
-                                                                               pdSelector,
-                                                                               ckSelector,
-                                                                               schema,
-                                                                               new RowVisitor()
-                                                                        {
-                                                                            public CompiledStatement write(long lts, long pd, long cd, long m)
-                                                                            {
-                                                                                consumer.accept(pd, cd);
-                                                                                return compiledStatement;
-                                                                            }
+        Run run = new Run(rng,
+                new OffsetClock(0),
+                pdSelector,
+                ckSelector,
+                schema,
+                DataTracker.NO_OP,
+                SystemUnderTest.NO_OP,
+                MetricReporter.NO_OP);
+
+        PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run,
+                                                                         (r) -> new RowVisitor()
+                                                                         {
+                                                                             public CompiledStatement write(long lts, long pd, long cd, long m)
+                                                                             {
+                                                                                 consumer.accept(pd, cd);
+                                                                                 return compiledStatement;
+                                                                             }
+
+                                                                             public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
+                                                                             {
+                                                                                 consumer.accept(pd, cd);
+                                                                                 return compiledStatement;
+                                                                             }
 
-                                                                            public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
-                                                                            {
-                                                                                consumer.accept(pd, cd);
-                                                                                return compiledStatement;
-                                                                            }
+                                                                             public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
+                                                                             {
+                                                                                 consumer.accept(pd, cd);
+                                                                                 return compiledStatement;
+                                                                             }
 
-                                                                            public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
-                                                                            {
-                                                                                consumer.accept(pd, cd);
-                                                                                return compiledStatement;
-                                                                            }
+                                                                             public CompiledStatement deleteRange(long lts, long pd, long opId)
+                                                                             {
+                                                                                 // ignore
+                                                                                 return compiledStatement;
+                                                                             }
 
-                                                                            public CompiledStatement deleteRange(long lts, long pd, long opId)
-                                                                            {
-                                                                                // ignore
-                                                                                return compiledStatement;
-                                                                            }
-                                                                        }).get();
+                                                                             public CompiledStatement deleteSlice(long lts, long pd, long opId)
+                                                                             {
+                                                                                 // ignore
+                                                                                 return compiledStatement;
+                                                                             }
+                                                                         });
 
         for (int lts = 0; lts < 1000; lts++)
         {
diff --git a/harry-core/test/harry/operations/RelationTest.java b/harry-core/test/harry/operations/RelationTest.java
index 15ab02e..70ccd69 100644
--- a/harry-core/test/harry/operations/RelationTest.java
+++ b/harry-core/test/harry/operations/RelationTest.java
@@ -32,7 +32,7 @@ import harry.ddl.SchemaSpec;
 import harry.generators.DataGeneratorsTest;
 import harry.model.OpSelectors;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 
 public class RelationTest
@@ -42,9 +42,9 @@ public class RelationTest
     @Test
     public void testKeyGenerators()
     {
-        for (int cnt = 1; cnt < 5; cnt++)
+        for (int size = 1; size < 5; size++)
         {
-            Iterator<ColumnSpec.DataType[]> iter = DataGeneratorsTest.permutations(cnt,
+            Iterator<ColumnSpec.DataType[]> iter = DataGeneratorsTest.permutations(size,
                                                                                    ColumnSpec.DataType.class,
                                                                                    ColumnSpec.int8Type,
                                                                                    ColumnSpec.asciiType,
@@ -62,7 +62,7 @@ public class RelationTest
                     spec.add(ColumnSpec.ck("r" + i, types[i], false));
 
                 SchemaSpec schemaSpec = new SchemaSpec("ks",
-                                                   "tbl",
+                                                       "tbl",
                                                        Collections.singletonList(ColumnSpec.pk("pk", ColumnSpec.int64Type)),
                                                        spec,
                                                        Collections.emptyList(),
@@ -86,116 +86,123 @@ public class RelationTest
                 Arrays.sort(cds);
 
                 OpSelectors.Rng rng = new OpSelectors.PCGFast(1L);
-                // TODO: replace with mocks?
-                QuerySelector querySelector = new QuerySelector(schemaSpec,
-                                                                new OpSelectors.PdSelector()
-                                                                {
-                                                                    protected long pd(long lts)
-                                                                    {
-                                                                        return lts;
-                                                                    }
-
-                                                                    public long nextLts(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long prevLts(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long maxLts(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long minLtsAt(long position)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long minLtsFor(long pd)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long positionFor(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-                                                                },
-                                                                new OpSelectors.DescriptorSelector()
-                                                                {
-                                                                    public int numberOfModifications(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public int opsPerModification(long lts)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public int maxPartitionSize()
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public boolean isCdVisitedBy(long pd, long lts, long cd)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    protected long cd(long pd, long lts, long opId)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long randomCd(long pd, long entropy)
-                                                                    {
-                                                                        return  Math.abs(rng.prev(entropy)) % cds.length;
-                                                                    }
-
-                                                                    protected long vd(long pd, long cd, long lts, long opId, int col)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public OpSelectors.OperationKind operationType(long pd, long lts, long opId)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public BitSet columnMask(long pd, long lts, long opId)
-                                                                    {
-                                                                        throw new RuntimeException("not implemented");
-                                                                    }
-
-                                                                    public long rowId(long pd, long lts, long cd)
-                                                                    {
-                                                                        return 0;
-                                                                    }
-
-                                                                    public long modificationId(long pd, long cd, long lts, long vd, int col)
-                                                                    {
-                                                                        return 0;
-                                                                    }
-                                                                },
-                                                                rng);
-
 
+                // TODO: replace with mocks?
+                QueryGenerator querySelector = new QueryGenerator(schemaSpec,
+                                                                  new OpSelectors.PdSelector()
+                                                                  {
+                                                                      protected long pd(long lts)
+                                                                      {
+                                                                          return lts;
+                                                                      }
+
+                                                                      public long nextLts(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long prevLts(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long maxLtsFor(long pd)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long maxLts(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long minLtsAt(long position)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+
+                                                                      public long minLtsFor(long pd)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long positionFor(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+                                                                  },
+                                                                  new OpSelectors.DescriptorSelector()
+                                                                  {
+                                                                      public int numberOfModifications(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public int opsPerModification(long lts)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public int maxPartitionSize()
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public boolean isCdVisitedBy(long pd, long lts, long cd)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      protected long cd(long pd, long lts, long opId)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long randomCd(long pd, long entropy)
+                                                                      {
+                                                                          return Math.abs(rng.prev(entropy)) % cds.length;
+                                                                      }
+
+                                                                      protected long vd(long pd, long cd, long lts, long opId, int col)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public OpSelectors.OperationKind operationType(long pd, long lts, long opId)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public BitSet columnMask(long pd, long lts, long opId)
+                                                                      {
+                                                                          throw new RuntimeException("not implemented");
+                                                                      }
+
+                                                                      public long rowId(long pd, long lts, long cd)
+                                                                      {
+                                                                          return 0;
+                                                                      }
+
+                                                                      public long modificationId(long pd, long cd, long lts, long vd, int col)
+                                                                      {
+                                                                          return 0;
+                                                                      }
+                                                                  },
+                                                                  rng);
+
+                QueryGenerator.TypedQueryGenerator gen = new QueryGenerator.TypedQueryGenerator(rng, querySelector);
 
                 try
                 {
                     for (int i = 0; i < RUNS; i++)
                     {
-                        Query query = querySelector.inflate(i, 0);
+                        Query query = gen.inflate(i, 0);
                         for (int j = 0; j < cds.length; j++)
                         {
                             long cd = schemaSpec.ckGenerator.adjustEntropyDomain(cds[i]);
                             // the only thing we care about here is that query
-                            Assert.assertEquals(String.format("Error caught quen running a query %s with cd %d",
+                            Assert.assertEquals(String.format("Error caught while running a query %s with cd %d",
                                                               query, cd),
                                                 Query.simpleMatch(query, cd),
                                                 query.match(cd));
diff --git a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java b/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
index 2cf22e7..e471ede 100644
--- a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
+++ b/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
@@ -58,7 +58,7 @@ public class ExternalClusterSut implements SystemUnderTest
     {
         // TODO: close Cluster and Session!
         return new ExternalClusterSut(Cluster.builder()
-                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM))
+                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(toDriverCl(ConsistencyLevel.QUORUM)))
                                              .addContactPoints(config.contactPoints)
                                              .withPort(config.port)
                                              .withCredentials(config.username, config.password)
@@ -86,14 +86,16 @@ public class ExternalClusterSut implements SystemUnderTest
     }
 
     // TODO: this is rather simplistic
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
     {
         int repeat = 10;
         while (true)
         {
             try
             {
-                return resultSetToObjectArray(session.execute(statement, bindings));
+                Statement st = new SimpleStatement(statement, bindings);
+                st.setConsistencyLevel(toDriverCl(cl));
+                return resultSetToObjectArray(session.execute(st));
             }
             catch (Throwable t)
             {
@@ -129,10 +131,12 @@ public class ExternalClusterSut implements SystemUnderTest
         return results;
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
         CompletableFuture<Object[][]> future = new CompletableFuture<>();
-        Futures.addCallback(session.executeAsync(statement, bindings),
+        Statement st = new SimpleStatement(statement, bindings);
+        st.setConsistencyLevel(toDriverCl(cl));
+        Futures.addCallback(session.executeAsync(st),
                             new FutureCallback<ResultSet>()
                             {
                                 public void onSuccess(ResultSet rows)
@@ -176,4 +180,14 @@ public class ExternalClusterSut implements SystemUnderTest
             return ExternalClusterSut.create(this);
         }
     }
+
+    public static com.datastax.driver.core.ConsistencyLevel toDriverCl(SystemUnderTest.ConsistencyLevel cl)
+    {
+        switch (cl)
+        {
+            case ALL:    return com.datastax.driver.core.ConsistencyLevel.ALL;
+            case QUORUM: return com.datastax.driver.core.ConsistencyLevel.QUORUM;
+        }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
+    }
 }
\ No newline at end of file
diff --git a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
index 5462568..702163e 100644
--- a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
+++ b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
@@ -21,10 +21,11 @@ package harry.runner.external;
 import harry.core.Configuration;
 import harry.model.sut.external.ExternalClusterSut;
 import harry.runner.HarryRunner;
+import harry.runner.Runner;
 
 import java.io.File;
 
-public class HarryRunnerExternal implements HarryRunner {
+public class HarryRunnerExternal extends HarryRunner {
 
     public static void main(String[] args) throws Throwable {
         ExternalClusterSut.registerSubtypes();
@@ -35,4 +36,9 @@ public class HarryRunnerExternal implements HarryRunner {
         Configuration configuration = Configuration.fromFile(configFile);
         runner.run(configuration);
     }
+
+    @Override
+    public void beforeRun(Runner runner) {
+
+    }
 }
diff --git a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java b/harry-integration/src/harry/model/sut/ExternalClusterSut.java
similarity index 66%
copy from harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
copy to harry-integration/src/harry/model/sut/ExternalClusterSut.java
index 2cf22e7..aefe508 100644
--- a/harry-integration-external/src/harry/model/sut/external/ExternalClusterSut.java
+++ b/harry-integration/src/harry/model/sut/ExternalClusterSut.java
@@ -16,16 +16,7 @@
  *  limitations under the License.
  */
 
-package harry.model.sut.external;
-
-import com.datastax.driver.core.*;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import harry.core.Configuration;
-import harry.model.sut.SystemUnderTest;
+package harry.model.sut;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -33,13 +24,20 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+
 public class ExternalClusterSut implements SystemUnderTest
 {
-    public static void registerSubtypes()
-    {
-        Configuration.registerSubtypes(ExternalSutConfiguration.class);
-    }
-
     private final Session session;
     private final ExecutorService executor;
 
@@ -54,14 +52,12 @@ public class ExternalClusterSut implements SystemUnderTest
         this.executor = Executors.newFixedThreadPool(threads);
     }
 
-    public static ExternalClusterSut create(ExternalSutConfiguration config)
+    public static ExternalClusterSut create()
     {
         // TODO: close Cluster and Session!
         return new ExternalClusterSut(Cluster.builder()
-                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM))
-                                             .addContactPoints(config.contactPoints)
-                                             .withPort(config.port)
-                                             .withCredentials(config.username, config.password)
+                                             .withQueryOptions(new QueryOptions().setConsistencyLevel(toDriverCl(ConsistencyLevel.QUORUM)))
+                                             .addContactPoints("127.0.0.1")
                                              .build()
                                              .connect());
     }
@@ -86,14 +82,16 @@ public class ExternalClusterSut implements SystemUnderTest
     }
 
     // TODO: this is rather simplistic
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
     {
         int repeat = 10;
         while (true)
         {
             try
             {
-                return resultSetToObjectArray(session.execute(statement, bindings));
+                Statement st = new SimpleStatement(statement, bindings);
+                st.setConsistencyLevel(toDriverCl(cl));
+                return resultSetToObjectArray(session.execute(st));
             }
             catch (Throwable t)
             {
@@ -107,8 +105,7 @@ public class ExternalClusterSut implements SystemUnderTest
         }
     }
 
-
-    private static Object[][] resultSetToObjectArray(ResultSet rs)
+    public static Object[][] resultSetToObjectArray(ResultSet rs)
     {
         List<Row> rows = rs.all();
         if (rows.size() == 0)
@@ -129,10 +126,12 @@ public class ExternalClusterSut implements SystemUnderTest
         return results;
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
         CompletableFuture<Object[][]> future = new CompletableFuture<>();
-        Futures.addCallback(session.executeAsync(statement, bindings),
+        Statement st = new SimpleStatement(statement, bindings);
+        st.setConsistencyLevel(toDriverCl(cl));
+        Futures.addCallback(session.executeAsync(st),
                             new FutureCallback<ResultSet>()
                             {
                                 public void onSuccess(ResultSet rows)
@@ -150,30 +149,13 @@ public class ExternalClusterSut implements SystemUnderTest
         return future;
     }
 
-    @JsonTypeName("external")
-    public static class ExternalSutConfiguration implements Configuration.SutConfiguration
+    public static com.datastax.driver.core.ConsistencyLevel toDriverCl(SystemUnderTest.ConsistencyLevel cl)
     {
-
-        private final String contactPoints;
-        private final int port;
-        private final String username;
-        private final String password;
-
-        @JsonCreator
-        public ExternalSutConfiguration(@JsonProperty(value = "contact_points") String contactPoints,
-                                        @JsonProperty(value = "port") int port,
-                                        @JsonProperty(value = "username") String username,
-                                        @JsonProperty(value = "password") String password)
-        {
-            this.contactPoints = contactPoints;
-            this.port = port;
-            this.username = username;
-            this.password = password;
-        }
-
-        public SystemUnderTest make()
+        switch (cl)
         {
-            return ExternalClusterSut.create(this);
+            case ALL:    return com.datastax.driver.core.ConsistencyLevel.ALL;
+            case QUORUM: return com.datastax.driver.core.ConsistencyLevel.QUORUM;
         }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
     }
 }
\ No newline at end of file
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index d6b21a2..1786f60 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -35,11 +35,15 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import harry.core.Configuration;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
 
 public class InJvmSut implements SystemUnderTest
 {
-    public static void registerSubtypes()
+    public static void init()
     {
         Configuration.registerSubtypes(InJvmSutConfiguration.class);
     }
@@ -51,6 +55,7 @@ public class InJvmSut implements SystemUnderTest
     public final Cluster cluster;
     private final AtomicLong cnt = new AtomicLong();
     private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
     public InJvmSut(Cluster cluster)
     {
         this(cluster, 10);
@@ -62,6 +67,11 @@ public class InJvmSut implements SystemUnderTest
         this.executor = Executors.newFixedThreadPool(threads);
     }
 
+    public Cluster cluster()
+    {
+        return cluster;
+    }
+
     public boolean isShutdown()
     {
         return isShutdown.get();
@@ -73,6 +83,7 @@ public class InJvmSut implements SystemUnderTest
 
         cluster.close();
         executor.shutdown();
+
         try
         {
             executor.awaitTermination(30, TimeUnit.SECONDS);
@@ -88,17 +99,67 @@ public class InJvmSut implements SystemUnderTest
         cluster.schemaChange(statement);
     }
 
-    public Object[][] execute(String statement, Object... bindings)
+    public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return execute(statement, cl, (int) (cnt.getAndIncrement() % cluster.size() + 1), bindings);
+    }
+
+    public Object[][] execute(String statement, ConsistencyLevel cl, int coordinator, Object... bindings)
+    {
+        if (isShutdown.get())
+            throw new RuntimeException("Instance is shut down");
+
+        try
+        {
+            if (cl == ConsistencyLevel.NODE_LOCAL)
+            {
+                return cluster.get(coordinator)
+                              .executeInternal(statement, bindings);
+            }
+            else
+            {
+                return cluster
+                       // round-robin
+                       .coordinator(coordinator)
+                       .execute(statement, toApiCl(cl), bindings);
+            }
+        }
+        catch (Throwable t)
+        {
+            logger.error(String.format("Caught error while trying execute statement %s: %s", statement, t.getMessage()),
+                         t);
+            throw t;
+        }
+    }
+
+    // TODO: Ideally, we need to be able to induce a failure of a single specific message
+    public Object[][] executeWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
     {
         if (isShutdown.get())
             throw new RuntimeException("Instance is shut down");
 
         try
         {
-            return cluster
-                   // round-robin
-                   .coordinator((int) (cnt.getAndIncrement() % cluster.size() + 1))
-                   .execute(statement, ConsistencyLevel.QUORUM, bindings);
+            int coordinator = (int) (cnt.getAndIncrement() % cluster.size() + 1);
+            IMessageFilters filters = cluster.filters();
+
+            // Drop exactly one coordinated message
+            filters.verbs(Verb.MUTATION_REQ.id).from(coordinator).messagesMatching(new IMessageFilters.Matcher()
+            {
+                private final AtomicBoolean issued = new AtomicBoolean();
+                public boolean matches(int from, int to, IMessage message)
+                {
+                    if (from != coordinator || message.verb() != Verb.MUTATION_REQ.id)
+                        return false;
+
+                    return !issued.getAndSet(true);
+                }
+            }).drop().on();
+            Object[][] res = cluster
+                             .coordinator(coordinator)
+                             .execute(statement, toApiCl(cl), bindings);
+            filters.reset();
+            return res;
         }
         catch (Throwable t)
         {
@@ -108,9 +169,14 @@ public class InJvmSut implements SystemUnderTest
         }
     }
 
-    public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+    public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
     {
-        return CompletableFuture.supplyAsync(() -> execute(statement, bindings), executor);
+        return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings), executor);
+    }
+
+    public CompletableFuture<Object[][]> executeAsyncWithWriteFailure(String statement, ConsistencyLevel cl, Object... bindings)
+    {
+        return CompletableFuture.supplyAsync(() -> executeWithWriteFailure(statement, cl, bindings), executor);
     }
 
     @JsonTypeName("in_jvm")
@@ -132,17 +198,26 @@ public class InJvmSut implements SystemUnderTest
 
         public SystemUnderTest make()
         {
+            try
+            {
+                ICluster.setup();
+            }
+            catch (Throwable throwable)
+            {
+                throwable.printStackTrace();
+            }
+
             Cluster cluster;
             try
             {
                 cluster = Cluster.build().withConfig((cfg) -> {
                     // TODO: make this configurable
-                    cfg.set("row_cache_size_in_mb", 10L)
+                    cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                       .set("row_cache_size_in_mb", 10L)
                        .set("index_summary_capacity_in_mb", 10L)
                        .set("counter_cache_size_in_mb", 10L)
                        .set("key_cache_size_in_mb", 10L)
                        .set("file_cache_size_in_mb", 10)
-                       .set("prepared_statements_cache_size_mb", 10L)
                        .set("memtable_heap_space_in_mb", 128)
                        .set("memtable_offheap_space_in_mb", 128)
                        .set("memtable_flush_writers", 1)
@@ -151,7 +226,8 @@ public class InJvmSut implements SystemUnderTest
                        .set("concurrent_writes", 5)
                        .set("compaction_throughput_mb_per_sec", 10)
                        .set("hinted_handoff_enabled", false);
-                }).withNodes(nodes)
+                })
+                                 .withNodes(nodes)
                                  .withRoot(new File(root)).createWithoutStarting();
             }
             catch (IOException e)
@@ -163,4 +239,15 @@ public class InJvmSut implements SystemUnderTest
             return new InJvmSut(cluster);
         }
     }
+
+    public static org.apache.cassandra.distributed.api.ConsistencyLevel toApiCl(ConsistencyLevel cl)
+    {
+        switch (cl)
+        {
+            case ALL:    return org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+            case QUORUM: return org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
+            case NODE_LOCAL: return org.apache.cassandra.distributed.api.ConsistencyLevel.NODE_LOCAL;
+        }
+        throw new IllegalArgumentException("Don't know a CL: " + cl);
+    }
 }
\ No newline at end of file
diff --git a/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java b/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java
new file mode 100644
index 0000000..84e1a73
--- /dev/null
+++ b/harry-integration/src/harry/runner/FaultInjectingPartitionVisitor.java
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class FaultInjectingPartitionVisitor extends LoggingPartitionVisitor
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(FaultInjectingPartitionVisitorConfiguration.class);
+    }
+
+    @JsonTypeName("fault_injecting")
+    public static class FaultInjectingPartitionVisitorConfiguration extends Configuration.MutatingPartitionVisitorConfiguation
+    {
+        @JsonCreator
+        public FaultInjectingPartitionVisitorConfiguration(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration row_visitor)
+        {
+            super(row_visitor);
+        }
+
+        @Override
+        public PartitionVisitor make(Run run)
+        {
+            return new FaultInjectingPartitionVisitor(run, row_visitor);
+        }
+    }
+
+    private final AtomicInteger cnt = new AtomicInteger();
+
+    private final InJvmSut sut;
+
+    public FaultInjectingPartitionVisitor(Run run, RowVisitor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run, rowVisitorFactory);
+        this.sut = (InJvmSut) run.sut;
+    }
+
+    void executeAsyncWithRetries(CompletableFuture<Object[][]> originator, CompiledStatement statement)
+    {
+        executeAsyncWithRetries(originator, statement, true);
+    }
+
+    void executeAsyncWithRetries(CompletableFuture<Object[][]> originator, CompiledStatement statement, boolean allowFailures)
+    {
+        if (sut.isShutdown())
+            throw new IllegalStateException("System under test is shut down");
+
+        CompletableFuture<Object[][]> future;
+        if (allowFailures && cnt.getAndIncrement() % 2 == 0)
+        {
+            future = sut.executeAsyncWithWriteFailure(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings());
+        }
+        else
+        {
+            future = sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings());
+        }
+
+        future.whenComplete((res, t) -> {
+               if (t != null)
+                   executor.schedule(() -> executeAsyncWithRetries(originator, statement, false), 1, TimeUnit.SECONDS);
+               else
+                   originator.complete(res);
+           });
+    }
+}
diff --git a/harry-integration/src/harry/runner/QueryingNoOpChecker.java b/harry-integration/src/harry/runner/QueryingNoOpChecker.java
new file mode 100644
index 0000000..08b8e6b
--- /dev/null
+++ b/harry-integration/src/harry/runner/QueryingNoOpChecker.java
@@ -0,0 +1,65 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.model.Model;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+public class QueryingNoOpChecker implements Model
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(QueryingNoOpCheckerConfig.class);
+    }
+
+    private final Run run;
+
+    public QueryingNoOpChecker(Run run)
+    {
+        this.run = run;
+    }
+
+    @Override
+    public void validate(Query query)
+    {
+        CompiledStatement compiled = query.toSelectStatement();
+        run.sut.execute(compiled.cql(),
+                        SystemUnderTest.ConsistencyLevel.QUORUM,
+                        compiled.bindings());
+    }
+
+    @JsonTypeName("querying_no_op_checker")
+    public static class QueryingNoOpCheckerConfig implements Configuration.ModelConfiguration
+    {
+        @JsonCreator
+        public QueryingNoOpCheckerConfig()
+        {
+        }
+
+        public Model make(Run run)
+        {
+            return new QueryingNoOpChecker(run);
+        }
+    }
+}
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
new file mode 100644
index 0000000..98beb93
--- /dev/null
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -0,0 +1,137 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.data.ResultSetRow;
+import harry.model.Model;
+import harry.model.QuiescentChecker;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+
+import static harry.model.SelectHelper.resultSetToRow;
+
+public class RepairingLocalStateValidator extends AllPartitionsValidator
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(RepairingLocalStateValidatorConfiguration.class,
+                                       QuiescentCheckerConfig.class);
+    }
+
+    public final InJvmSut inJvmSut;
+
+    public RepairingLocalStateValidator(int concurrency, int triggerAfter, Run run, Model.ModelFactory modelFactory)
+    {
+        super(concurrency, triggerAfter, run, modelFactory);
+
+        this.inJvmSut = (InJvmSut) run.sut;
+    }
+
+    @Override
+    public void visitPartition(long lts)
+    {
+        if (lts > 0 && lts % triggerAfter == 0)
+        {
+            System.out.println("Starting repair...");
+            inJvmSut.cluster().get(1).nodetool("repair", "--full");
+            System.out.println("Validating partitions...");
+            validateAllPartitions(executor, concurrency);
+        }
+    }
+
+    @JsonTypeName("repair_and_validate_local_states")
+    public static class RepairingLocalStateValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    {
+        private final int concurrency;
+        private final int trigger_after;
+        private final Configuration.ModelConfiguration modelConfiguration;
+
+        @JsonCreator
+        public RepairingLocalStateValidatorConfiguration(@JsonProperty("concurrency") int concurrency,
+                                                         @JsonProperty("trigger_after") int trigger_after,
+                                                         @JsonProperty("model") Configuration.ModelConfiguration model)
+        {
+            this.concurrency = concurrency;
+            this.trigger_after = trigger_after;
+            this.modelConfiguration = model;
+        }
+
+        public PartitionVisitor make(Run run)
+        {
+            return new RepairingLocalStateValidator(concurrency, trigger_after, run, modelConfiguration);
+        }
+    }
+
+    public static class QuiescentLocalStateChecker extends QuiescentChecker
+    {
+        public final InJvmSut inJvmSut;
+
+        public QuiescentLocalStateChecker(Run run)
+        {
+            super(run);
+            assert run.sut instanceof InJvmSut;
+
+            this.inJvmSut = (InJvmSut) run.sut;
+        }
+
+        @Override
+        public void validate(Query query)
+        {
+            CompiledStatement compiled = query.toSelectStatement();
+            for (int i = 1; i <= inJvmSut.cluster.size(); i++)
+            {
+                int node = i;
+                validate(() -> {
+                    Object[][] objects = inJvmSut.execute(compiled.cql(),
+                                                          SystemUnderTest.ConsistencyLevel.NODE_LOCAL,
+                                                          node,
+                                                          compiled.bindings());
+                    List<ResultSetRow> result = new ArrayList<>();
+                    for (Object[] obj : objects)
+                        result.add(resultSetToRow(query.schemaSpec, clock, obj));
+
+                    return result;
+                }, query);
+            }
+        }
+    }
+
+    @JsonTypeName("quiescent_local_state_checker")
+    public static class QuiescentCheckerConfig implements Configuration.ModelConfiguration
+    {
+        @JsonCreator
+        public QuiescentCheckerConfig()
+        {
+        }
+
+        public Model make(Run run)
+        {
+            return new QuiescentLocalStateChecker(run);
+        }
+    }
+}
diff --git a/harry-integration/src/harry/runner/Reproduce.java b/harry-integration/src/harry/runner/Reproduce.java
index 049c864..78a96f6 100644
--- a/harry-integration/src/harry/runner/Reproduce.java
+++ b/harry-integration/src/harry/runner/Reproduce.java
@@ -33,7 +33,7 @@ public class Reproduce extends TestBaseImpl
 
     public void runWithInJvmDtest() throws Throwable
     {
-        InJvmSut.registerSubtypes();
+        InJvmSut.init();
 
         System.setProperty("cassandra.disable_tcactive_openssl", "true");
         System.setProperty("relocated.shaded.io.netty.transport.noNative", "true");
@@ -46,7 +46,7 @@ public class Reproduce extends TestBaseImpl
 
         try
         {
-            run.validator.validatePartition(0L);
+//            run.validator.validatePartition(0L);
         }
         catch(Throwable t)
         {
diff --git a/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java b/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
index 4a38e7b..ecbf499 100644
--- a/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/ExhaustiveCheckerIntegrationTest.java
@@ -19,10 +19,9 @@
 package harry.model;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import harry.core.Configuration;
@@ -33,34 +32,40 @@ import harry.corruptor.HideRowCorruptor;
 import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
+import harry.ddl.SchemaGenerators;
+import harry.model.sut.InJvmSut;
 import harry.model.sut.SystemUnderTest;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.Validator;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import harry.runner.SinglePartitionValidator;
 
-public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
+public class ExhaustiveCheckerIntegrationTest extends ModelTestBase
 {
     @Test
     public void testVerifyPartitionState()
     {
-        Configuration config = sharedConfiguration(1).build();
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
+        Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
 
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        for (int i = 0; i < 2000; i++)
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
-        }
+            Configuration config = gen.get().build();
+            Run run = config.createRun();
+            run.sut.schemaChange(run.schemaSpec.compile().cql());
+            OpSelectors.MonotonicClock clock = run.clock;
 
-        validator.validatePartition(0);
+            SinglePartitionValidator validator = new SinglePartitionValidator(100, run, modelConfiguration());
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+
+            for (int i = 0; i < 2000; i++)
+            {
+                long lts = clock.nextLts();
+                partitionVisitor.visitPartition(lts);
+            }
+
+            validator.visitPartition(0);
+        }
     }
 
     @Test
@@ -71,13 +76,13 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideRowCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              t.getCause() != null &&t.getCause().toString().contains(OpSelectors.OperationKind.WRITE.toString())));
+                     (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+                                                   t.getCause() != null && t.getCause().toString().contains(OpSelectors.OperationKind.WRITE.toString())));
     }
 
     @Test
@@ -88,18 +93,21 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                      run.clock,
                                                                                      run.descriptorSelector);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              // TODO: this is not entirely correct. Right now, after registering a deletion followed by no writes,
-                                              // we would continue going back in time and checking other operations, even though we don't have to do this.
-                                              t.getCause().getMessage().contains("Modification should have been visible but was not") ||
-                                              t.getCause().getMessage().contains("Observed unvalidated rows") ||
-                                              t.getCause() != null && t.getCause().getMessage().contains("was never written"));
+                                           // TODO: this is not entirely correct. Right now, after registering a deletion followed by no writes,
+                                           // we would continue going back in time and checking other operations, even though we don't have to do this.
+                                           t.getCause().getMessage().contains("Modification should have been visible but was not") ||
+                                           // TODO: this is not entirely correct, either. This row is, in fact, present in both dataset _and_
+                                           // in the model, it's just there might be _another_ row right in front of it.
+                                           t.getCause().getMessage().contains("expected row not to be visible") ||
+                                           t.getCause().getMessage().contains("Observed unvalidated rows") ||
+                                           t.getCause() != null && t.getCause().getMessage().contains("was never written"));
                      });
     }
 
@@ -112,18 +120,17 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not")));
+                     (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+                                                   t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not")));
     }
 
 
     @Test
-    @Ignore
     public void testDetectsOverwrittenRow()
     {
         negativeTest((run) -> {
@@ -131,89 +138,61 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    ChangeValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
-                                              t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not.")));
+                     (t, run) -> Assert.assertTrue(String.format("Throwable: %s\nCause: %s", t, t.getCause()),
+                                                   t.getCause() != null && t.getCause().getMessage().contains("Modification should have been visible but was not.")));
     }
 
-
-    static void negativeTest(Consumer<Run> corrupt, Consumer<Throwable> validate)
+    @Test
+    public void testLocalOnlyExecution()
     {
-        Configuration config = sharedConfiguration()
-                               .setClusteringDescriptorSelector((rng, schemaSpec) -> {
-                                   return new OpSelectors.DefaultDescriptorSelector(rng,
-                                                                                    new OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec.regularColumns.size()).build(),
-                                                                                    Surjections.pick(OpSelectors.OperationKind.DELETE_COLUMN,
-                                                                                                     OpSelectors.OperationKind.DELETE_ROW,
-                                                                                                     OpSelectors.OperationKind.WRITE),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    100);
-                               })
-                               .build();
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        for (int i = 0; i < 200; i++)
-        {
-            long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
-        }
+        LocalOnlySut localOnlySut = new LocalOnlySut();
 
-        corrupt.accept(run);
+        Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
 
-        try
-        {
-            validator.validatePartition(0);
-            Assert.fail("Should've thrown");
-        }
-        catch (Throwable t)
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            validate.accept(t);
+            Configuration config = gen.get()
+                                   .setClusteringDescriptorSelector((builder) -> {
+                                       builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                                                       .addWeight(OpSelectors.OperationKind.DELETE_ROW, 80)
+                                                                       .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
+                                                                       .addWeight(OpSelectors.OperationKind.WRITE, 10)
+                                                                       .build());
+                                   })
+                                   .setSUT(() -> localOnlySut)
+                                   .build();
+
+            Run run = config.createRun();
+            run.sut.schemaChange(run.schemaSpec.compile().cql());
+
+            OpSelectors.MonotonicClock clock = run.clock;
+
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+
+            localOnlySut.localOnly(() -> {
+                for (int i = 0; i < 5; i++)
+                {
+                    long lts = clock.nextLts();
+                    partitionVisitor.visitPartition(lts);
+                }
+            });
+
+            SinglePartitionValidator validator = new SinglePartitionValidator(100, run, ExhaustiveChecker::new);
+            validator.visitPartition(0);
         }
     }
 
-    @Test
-    public void testLocalOnlyExecution()
+    Configuration.ModelConfiguration modelConfiguration()
     {
-        LocalOnlySut localOnlySut = new LocalOnlySut();
-        Configuration config = sharedConfiguration()
-                               .setClusteringDescriptorSelector((builder) -> {
-                                   builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
-                                                                   .addWeight(OpSelectors.OperationKind.DELETE_ROW, 80)
-                                                                   .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
-                                                                   .addWeight(OpSelectors.OperationKind.WRITE, 10)
-                                                                   .build());
-                               })
-                               .setSUT(() -> localOnlySut)
-                               .build();
-
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
-
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        localOnlySut.localOnly(() -> {
-            for (int i = 0; i < 5; i++)
-            {
-                long lts = clock.nextLts();
-                partitionVisitor.visitPartition(lts);
-            }
-        });
-
-        validator.validatePartition(0);
+        return new Configuration.ExhaustiveCheckerConfig();
     }
 
-    private static class LocalOnlySut implements SystemUnderTest
+    public static class LocalOnlySut implements SystemUnderTest
     {
         private boolean localOnly = false;
         private int counter = 0;
@@ -233,17 +212,17 @@ public class ExhaustiveCheckerIntegrationTest extends IntegrationTestBase
             cluster.schemaChange(statement);
         }
 
-        public Object[][] execute(String statement, Object... bindings)
+        public Object[][] execute(String statement, ConsistencyLevel cl, Object... bindings)
         {
             if (localOnly)
                 return cluster.get((counter++) % cluster.size() + 1).executeInternal(statement, bindings);
             else
-                return cluster.coordinator((counter++) % cluster.size() + 1).execute(statement, ConsistencyLevel.ALL, bindings);
+                return cluster.coordinator((counter++) % cluster.size() + 1).execute(statement, InJvmSut.toApiCl(ConsistencyLevel.ALL), bindings);
         }
 
-        public CompletableFuture<Object[][]> executeAsync(String statement, Object... bindings)
+        public CompletableFuture<Object[][]> executeAsync(String statement, ConsistencyLevel cl, Object... bindings)
         {
-            return CompletableFuture.supplyAsync(() -> execute(statement, bindings));
+            return CompletableFuture.supplyAsync(() -> execute(statement, cl, bindings));
         }
 
         public void localOnly(Runnable r)
diff --git a/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java b/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
index 84d5a82..7c1ca04 100644
--- a/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
+++ b/harry-integration/test/harry/model/ExhaustiveCheckerUnitTest.java
@@ -26,13 +26,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.model.sut.NoOpSut;
+import harry.ddl.SchemaGenerators;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
@@ -43,55 +45,58 @@ public class ExhaustiveCheckerUnitTest
     @Test
     public void testOperationConsistency()
     {
-        LoggingRowVisitor rowVisitor = new LoggingRowVisitor();
-
-        Configuration config = IntegrationTestBase.sharedConfiguration()
-                                                  .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(10, 10))
-                                                  .setClusteringDescriptorSelector((builder) -> {
-                                                      builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
-                                                                                      .addWeight(OpSelectors.OperationKind.DELETE_ROW, 33)
-                                                                                      .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 33)
-                                                                                      .addWeight(OpSelectors.OperationKind.WRITE, 34)
-                                                                                      .build());
-                                                  })
-                                                  .setRowVisitor((a_, b_, c_, d_) -> rowVisitor)
-                                                  .setSUT(NoOpSut::new)
-                                                  .setCreateSchema(true)
-                                                  .build();
-
-        Run run = config.createRun();
-
-        ExhaustiveChecker checker = (ExhaustiveChecker) run.model;
-
-        int iterations = 10;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-        for (int i = 0; i < iterations; i++)
-            partitionVisitor.visitPartition(i);
-
-        for (List<ExhaustiveChecker.Operation> value : rowVisitor.executed.values())
-            Collections.sort(value);
-
-        for (int lts = 0; lts < iterations; lts++)
+        Supplier<Configuration.ConfigurationBuilder> gen = IntegrationTestBase.sharedConfiguration();
+
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            // TODO: turn query into interface to make it easier to deal with here
-            for (Collection<ExhaustiveChecker.Operation> modelOps : checker.inflatePartitionState(lts,
-                                                                                                  (iterations - 1),
-                                                                                                  Query.selectPartition(run.schemaSpec,
-                                                                                                                        run.pdSelector.pd(lts),
-                                                                                                                        false))
-                                                                           .operations.values())
+            LoggingRowVisitor rowVisitor = new LoggingRowVisitor();
+            Configuration config = gen.get()
+                                      .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(10, 10))
+                                      .setClusteringDescriptorSelector((builder) -> {
+                                          builder.setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                                                          .addWeight(OpSelectors.OperationKind.DELETE_ROW, 33)
+                                                                          .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 33)
+                                                                          .addWeight(OpSelectors.OperationKind.WRITE, 34)
+                                                                          .build());
+                                      })
+                                      .setSUT(() -> SystemUnderTest.NO_OP)
+                                      .setCreateSchema(true)
+                                      .build();
+
+            Run run = config.createRun();
+
+            ExhaustiveChecker checker = new ExhaustiveChecker(run);
+
+            PartitionVisitor visitor = new Configuration.MutatingPartitionVisitorConfiguation((r) -> rowVisitor).make(run);
+            int iterations = 10;
+
+            for (int i = 0; i < iterations; i++)
+                visitor.visitPartition(i);
+
+            for (List<ExhaustiveChecker.Operation> value : rowVisitor.executed.values())
+                Collections.sort(value);
+
+            for (int lts = 0; lts < iterations; lts++)
             {
-                ExhaustiveChecker.Operation op = modelOps.iterator().next();
-                List<ExhaustiveChecker.Operation> executedOps = rowVisitor.executed.get(new Pair(op.pd, op.cd));
-                Iterator<ExhaustiveChecker.Operation> modelIterator = modelOps.iterator();
-                Iterator<ExhaustiveChecker.Operation> executedIterator = executedOps.iterator();
-                while (modelIterator.hasNext() && executedIterator.hasNext())
+                // TODO: turn query into interface to make it easier to deal with here
+                for (Collection<ExhaustiveChecker.Operation> modelOps : checker.inflatePartitionState(iterations - 1,
+                                                                                                      Query.selectPartition(run.schemaSpec,
+                                                                                                                            run.pdSelector.pd(lts),
+                                                                                                                            false))
+                                                                        .operations.values())
                 {
+                    ExhaustiveChecker.Operation op = modelOps.iterator().next();
+                    List<ExhaustiveChecker.Operation> executedOps = rowVisitor.executed.get(new Pair(op.pd, op.cd));
+                    Iterator<ExhaustiveChecker.Operation> modelIterator = modelOps.iterator();
+                    Iterator<ExhaustiveChecker.Operation> executedIterator = executedOps.iterator();
+                    while (modelIterator.hasNext() && executedIterator.hasNext())
+                    {
+                        Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
+                                            executedIterator.next(), modelIterator.next());
+                    }
                     Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
-                                        executedIterator.next(), modelIterator.next());
+                                        modelIterator.hasNext(), executedIterator.hasNext());
                 }
-                Assert.assertEquals(String.format("\n%s\n%s", modelOps, executedOps),
-                                    modelIterator.hasNext(), executedIterator.hasNext());
             }
         }
     }
@@ -131,6 +136,11 @@ public class ExhaustiveCheckerUnitTest
         {
             return null;
         }
+
+        public CompiledStatement deleteSlice(long lts, long pd, long opId)
+        {
+            return null;
+        }
     }
 
     private static class Pair
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index 08d9f46..a3c2dfa 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -18,13 +18,14 @@
 
 package harry.model;
 
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
 import harry.core.Configuration;
+import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.model.clock.OffsetClock;
 import harry.model.sut.InJvmSut;
@@ -57,22 +58,16 @@ public class IntegrationTestBase extends TestBaseImpl
         cluster.schemaChange("CREATE KEYSPACE harry WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
     }
 
-    // TODO: run these tests with like a hundred different tables?
-    static Configuration.ConfigurationBuilder sharedConfiguration()
+    private static long seed = 0;
+    public static Supplier<Configuration.ConfigurationBuilder> sharedConfiguration()
     {
-        return sharedConfiguration(1);
+        Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        return () -> {
+            SchemaSpec schemaSpec = specGenerator.get();
+            return sharedConfiguration(seed, schemaSpec);
+        };
     }
 
-    private static AtomicInteger COUNTER = new AtomicInteger();
-
-    public static Configuration.ConfigurationBuilder sharedConfiguration(long seed)
-    {
-        int i = COUNTER.incrementAndGet();
-        SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + i, seed);
-        return sharedConfiguration(seed, schemaSpec);
-    }
-
-
     public static Configuration.ConfigurationBuilder sharedConfiguration(long seed, SchemaSpec schema)
     {
         return new Configuration.ConfigurationBuilder().setSeed(seed)
@@ -80,7 +75,6 @@ public class IntegrationTestBase extends TestBaseImpl
                                                        .setCreateSchema(true)
                                                        .setTruncateTable(false)
                                                        .setDropSchema(true)
-                                                       .setModel(new Configuration.ExhaustiveCheckerConfig())
                                                        .setSchemaProvider(seed1 -> schema)
                                                        .setClusteringDescriptorSelector((builder) -> {
                                                            builder
@@ -91,9 +85,9 @@ public class IntegrationTestBase extends TestBaseImpl
                                                                                     .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 10)
                                                                                     .addWeight(OpSelectors.OperationKind.WRITE, 80)
                                                                                     .build())
-                                                           .setMaxPartitionSize(1);
+                                                           .setMaxPartitionSize(100);
                                                        })
                                                        .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, 200))
                                                        .setSUT(() -> sut);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/test/harry/model/ModelTest.java b/harry-integration/test/harry/model/ModelTest.java
index ae3d7b7..7a7cfe9 100644
--- a/harry-integration/test/harry/model/ModelTest.java
+++ b/harry-integration/test/harry/model/ModelTest.java
@@ -18,8 +18,10 @@
 
 package harry.model;
 
+import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,6 +31,7 @@ import harry.ddl.SchemaSpec;
 import harry.generators.Surjections;
 import harry.generators.distribution.Distribution;
 import harry.model.sut.InJvmSut;
+import harry.runner.LoggingPartitionVisitor;
 import harry.runner.Runner;
 import harry.util.BitSet;
 import org.apache.cassandra.distributed.Cluster;
@@ -80,7 +83,10 @@ public class ModelTest extends TestBaseImpl
     @Test
     public void statefulVisibleRowsCheckerTest() throws Throwable
     {
-        visibleRowsCheckerTest(VisibleRowsChecker::new);
+        visibleRowsCheckerTest(VisibleRowsChecker::new,
+                               (cfg) -> {
+                                   cfg.setDataTracker(VisibleRowsChecker.LoggingDataTracker::new);
+                               });
     }
 
     @Test
@@ -91,13 +97,20 @@ public class ModelTest extends TestBaseImpl
 
     public void visibleRowsCheckerTest(Model.ModelFactory factory) throws Throwable
     {
+        visibleRowsCheckerTest(factory, (a) -> {});
+    }
+    public void visibleRowsCheckerTest(Model.ModelFactory factory, Consumer<Configuration.ConfigurationBuilder> configurator) throws Throwable
+    {
         try (Cluster cluster = Cluster.create(3))
         {
-            Configuration.ConfigurationBuilder configuration = new Configuration.ConfigurationBuilder();
-            configuration.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(2),
+            Configuration.ConfigurationBuilder builder = new Configuration.ConfigurationBuilder();
+            builder.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(2),
                                                                                             1, TimeUnit.SECONDS))
                          .setRunTime(1, TimeUnit.MINUTES)
-                         .setRunner(new Configuration.ConcurrentRunnerConfig(2, 2, 2))
+                         .setRunner(new Configuration.ConcurrentRunnerConfig(2,
+                                                                             Arrays.asList(new Configuration.LoggingPartitionVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+                                                                                           new Configuration.RecentPartitionsValidatorConfiguration(10, 10, factory::make),
+                                                                                           new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))))
                          .setSchemaProvider((seed) -> schema)
                          .setClusteringDescriptorSelector((OpSelectors.Rng rng, SchemaSpec schemaSpec) -> {
                              return new DescriptorSelectorBuilder()
@@ -109,10 +122,9 @@ public class ModelTest extends TestBaseImpl
                          .setCreateSchema(true)
                          .setTruncateTable(false)
                          .setDropSchema(false)
-                         .setModel(factory::create)
                          .setSUT(() -> new InJvmSut(cluster));
-
-            Runner runner = configuration.build().createRunner();
+            configurator.accept(builder);
+            Runner runner = builder.build().createRunner();
             try
             {
                 runner.initAndStartAll().get(2, TimeUnit.MINUTES);
@@ -144,4 +156,4 @@ public class ModelTest extends TestBaseImpl
     }
 }
 
-// TODO: test things gradually. First with simple schemas, then with more complex, then with completely random.
\ No newline at end of file
+// TODO: test things gradually. First with simple schemas, then with more complex, then with completely random.
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
new file mode 100644
index 0000000..6421355
--- /dev/null
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -0,0 +1,102 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaGenerators;
+import harry.ddl.SchemaSpec;
+import harry.runner.LoggingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
+import harry.runner.PartitionVisitor;
+import harry.runner.SinglePartitionValidator;
+
+public abstract class ModelTestBase extends IntegrationTestBase
+{
+    void negativeTest(Function<Run, Boolean> corrupt, BiConsumer<Throwable, Run> validate)
+    {
+        Supplier<SchemaSpec> supplier = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
+        {
+            SchemaSpec schema = supplier.get();
+            negativeTest(corrupt, validate, i, schema);
+        }
+    }
+
+    abstract Configuration.ModelConfiguration modelConfiguration();
+
+    protected PartitionVisitor validator(Run run)
+    {
+        return new SinglePartitionValidator(100, run, modelConfiguration());
+    }
+
+    void negativeTest(Function<Run, Boolean> corrupt, BiConsumer<Throwable, Run> validate, int counter, SchemaSpec schemaSpec)
+    {
+        Configuration config = sharedConfiguration(counter, schemaSpec)
+                               .setClusteringDescriptorSelector((builder) -> {
+                                   builder.setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(10))
+                                          .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(10))
+                                          .setMaxPartitionSize(100)
+                                          .setOperationKindWeights(new Configuration.OperationKindSelectorBuilder()
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_ROW, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_COLUMN, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_RANGE, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.DELETE_SLICE, 1)
+                                                                   .addWeight(OpSelectors.OperationKind.WRITE, 96)
+                                                                   .build());
+                               })
+                               .build();
+
+        Run run = config.createRun();
+        beforeEach();
+        run.sut.schemaChange(run.schemaSpec.compile().cql());
+        OpSelectors.MonotonicClock clock = run.clock;
+
+        PartitionVisitor validator = validator(run);
+        PartitionVisitor partitionVisitor = new LoggingPartitionVisitor(run, MutatingRowVisitor::new);
+
+        for (int i = 0; i < 200; i++)
+        {
+            long lts = clock.nextLts();
+            partitionVisitor.visitPartition(lts);
+        }
+
+        validator.visitPartition(0);
+
+        if (!corrupt.apply(run))
+        {
+            System.out.println("Could not corrupt");
+            return;
+        }
+
+        try
+        {
+            validator.visitPartition(0);
+        }
+        catch (Throwable t)
+        {
+            validate.accept(t, run);
+        }
+    }
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 2520089..8085a06 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -37,17 +38,20 @@ import harry.corruptor.HideRowCorruptor;
 import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.ShowValueCorruptor;
+import harry.ddl.SchemaGenerators;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 
 @RunWith(Parameterized.class)
 public class QuerySelectorNegativeTest extends IntegrationTestBase
 {
-    private final int rounds = 10;
     private final int ltss = 1000;
+
     private final Random rnd = new Random();
 
     private final QueryResponseCorruptorFactory corruptorFactory;
@@ -86,20 +90,26 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
     public void selectRows()
     {
         Map<Query.QueryKind, Integer> stats = new HashMap<>();
+        Supplier<Configuration.ConfigurationBuilder> gen = sharedConfiguration();
+
+        int rounds = SchemaGenerators.DEFAULT_RUNS;
         int failureCounter = 0;
         outer:
         for (int counter = 0; counter < rounds; counter++)
         {
             beforeEach();
-            Configuration config = sharedConfiguration(counter)
-                                   .setClusteringDescriptorSelector((builder) -> {
-                                       builder.setMaxPartitionSize(2000);
-                                   })
-                                   .build();
+            Configuration config = gen.get()
+                                      .setClusteringDescriptorSelector((builder) -> {
+                                          builder.setMaxPartitionSize(2000);
+                                      })
+                                      .build();
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
+
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = run.visitorFactory.get();
+
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+            Model model = new ExhaustiveChecker(run);
 
             QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
 
@@ -112,20 +122,23 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
             while (true)
             {
                 long verificationLts = rnd.nextInt(1000);
-                QuerySelector querySelector = new QuerySelector(run.schemaSpec,
-                                                                run.pdSelector,
-                                                                run.descriptorSelector,
-                                                                run.rng);
+                QueryGenerator queryGen = new QueryGenerator(run.schemaSpec,
+                                                             run.pdSelector,
+                                                             run.descriptorSelector,
+                                                             run.rng);
+
+                QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run.rng, queryGen);
 
                 Query query = querySelector.inflate(verificationLts, counter);
-                run.model.validatePartitionState(verificationLts, query);
+
+                model.validate(query);
 
                 if (!corruptor.maybeCorrupt(query, run.sut))
                     continue;
 
                 try
                 {
-                    run.model.validatePartitionState(verificationLts, query);
+                    model.validate(query);
                     Assert.fail("Should've failed");
                 }
                 catch (Throwable t)
@@ -141,4 +154,4 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
         Assert.assertTrue(String.format("Seen only %d failures", failureCounter),
                           failureCounter > (rounds * 0.8));
     }
-}
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 0859181..3e7f3f0 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -20,29 +20,37 @@ package harry.model;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import harry.core.Configuration;
 import harry.core.Run;
+import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
+import harry.runner.MutatingPartitionVisitor;
+import harry.runner.MutatingRowVisitor;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 
 public class QuerySelectorTest extends IntegrationTestBase
 {
-    private static int CYCLES = 100;
+    private static int CYCLES = 300;
 
     @Test
     public void basicQuerySelectorTest()
     {
-        for (int cnt = 0; cnt < 50; cnt++)
+        Supplier<SchemaSpec> schemaGen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + cnt, cnt);
+            beforeEach();
+            SchemaSpec schemaSpec = schemaGen.get();
             int partitionSize = 200;
+
             int[] fractions = new int[schemaSpec.clusteringKeys.size()];
             int last = partitionSize;
             for (int i = fractions.length - 1; i >= 0; i--)
@@ -60,7 +68,8 @@ public class QuerySelectorTest extends IntegrationTestBase
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = run.visitorFactory.get();
+
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
             {
@@ -68,15 +77,13 @@ public class QuerySelectorTest extends IntegrationTestBase
                 partitionVisitor.visitPartition(lts);
             }
 
-            QuerySelector querySelector = new QuerySelector(run.schemaSpec,
-                                                            run.pdSelector,
-                                                            run.descriptorSelector,
-                                                            run.rng);
+            QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
 
             for (int i = 0; i < CYCLES; i++)
             {
                 Query query = querySelector.inflate(i, i);
-                Object[][] results = run.sut.execute(query.toSelectStatement());
+
+                Object[][] results = run.sut.execute(query.toSelectStatement(), SystemUnderTest.ConsistencyLevel.QUORUM);
                 Set<Long> matchingClusterings = new HashSet<>();
                 for (Object[] row : results)
                 {
@@ -89,7 +96,7 @@ public class QuerySelectorTest extends IntegrationTestBase
                 // the simplest test there can be: every row that is in the partition and was returned by the query,
                 // has to "match", every other row has to be a non-match
                 CompiledStatement selectPartition = SelectHelper.select(run.schemaSpec, run.pdSelector.pd(i));
-                Object[][] partition = run.sut.execute(selectPartition);
+                Object[][] partition = run.sut.execute(selectPartition, SystemUnderTest.ConsistencyLevel.QUORUM);
                 for (Object[] row : partition)
                 {
                     long cd = SelectHelper.resultSetToRow(run.schemaSpec,
@@ -106,9 +113,10 @@ public class QuerySelectorTest extends IntegrationTestBase
     @Test
     public void querySelectorModelTest()
     {
-        for (int cnt = 0; cnt < 50; cnt++)
+        Supplier<SchemaSpec> gen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
         {
-            SchemaSpec schemaSpec = MockSchema.randomSchema("harry", "table" + cnt, cnt);
+            SchemaSpec schemaSpec = gen.get();
             int[] fractions = new int[schemaSpec.clusteringKeys.size()];
             int partitionSize = 200;
             int last = partitionSize;
@@ -127,7 +135,7 @@ public class QuerySelectorTest extends IntegrationTestBase
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = run.visitorFactory.get();
+            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
             {
@@ -135,18 +143,14 @@ public class QuerySelectorTest extends IntegrationTestBase
                 partitionVisitor.visitPartition(lts);
             }
 
-            QuerySelector querySelector = new QuerySelector(run.schemaSpec,
-                                                            run.pdSelector,
-                                                            run.descriptorSelector,
-                                                            run.rng);
-
-            Model model = run.model;
+            QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
+            Model model = new ExhaustiveChecker(run);
 
             long verificationLts = 10;
             for (int i = 0; i < CYCLES; i++)
             {
                 Query query = querySelector.inflate(verificationLts, i);
-                model.validatePartitionState(verificationLts, query);
+                model.validate(query);
             }
         }
     }
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index c64f05d..22d7171 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -18,8 +18,6 @@
 
 package harry.model;
 
-import java.util.function.Consumer;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -31,19 +29,28 @@ import harry.corruptor.HideRowCorruptor;
 import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
-import harry.generators.Surjections;
-import harry.generators.distribution.Distribution;
 import harry.runner.PartitionVisitor;
 import harry.runner.Query;
-import harry.runner.Validator;
+import harry.runner.SinglePartitionValidator;
 
-public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
+public class QuiescentCheckerIntegrationTest extends ModelTestBase
 {
+    @Override
+    protected PartitionVisitor validator(Run run)
+    {
+        return new SinglePartitionValidator(100, run, modelConfiguration());
+    }
+
     @Test
     public void testNormalCondition()
     {
-        negativeTest((run) -> {},
-                     Assert::assertNull);
+        negativeTest((run) -> { return  true; },
+                     (t, run) -> {
+                         if (t != null)
+                             throw new AssertionError(String.format("Throwable was supposed to be null. Schema: %s",
+                                                                    run.schemaSpec.compile().cql()),
+                                                      t);
+                     });
     }
 
     @Test
@@ -54,15 +61,18 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideRowCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         Query query = Query.selectPartition(run.schemaSpec,
+                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                             false);
+
+                         return corruptor.maybeCorrupt(query, run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
+                         // TODO: We can actually pinpoint the difference
                          String expected = "Expected results to have the same number of results, but expected result iterator has more results";
+                         String expected2 = "Found a row in the model that is not present in the resultset";
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
-                                           t.getMessage().contains(expected));
+                                           t.getMessage().contains(expected) || t.getMessage().contains(expected2));
                      });
     }
 
@@ -74,15 +84,17 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                      run.clock,
                                                                                      run.descriptorSelector);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          String expected = "Found a row in the model that is not present in the resultset";
+                         String expected2 = "Expected results to have the same number of results, but actual result iterator has more results";
+
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
-                                           t.getMessage().contains(expected));
+                                           t.getMessage().contains(expected) || t.getMessage().contains(expected2));
                      });
     }
 
@@ -95,12 +107,12 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    HideValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          String expected = "Returned row state doesn't match the one predicted by the model";
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
                                            t.getMessage().contains(expected));
@@ -116,55 +128,20 @@ public class QuiescentCheckerIntegrationTest extends IntegrationTestBase
                                                                                                    run.clock,
                                                                                                    ChangeValueCorruptor::new);
 
-                         Assert.assertTrue(corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
-                                                                                        run.pdSelector.pd(0, run.schemaSpec),
-                                                                                        false),
-                                                                  run.sut));
+                         return corruptor.maybeCorrupt(Query.selectPartition(run.schemaSpec,
+                                                                             run.pdSelector.pd(0, run.schemaSpec),
+                                                                             false),
+                                                       run.sut);
                      },
-                     (t) -> {
+                     (t, run) -> {
                          String expected = "Returned row state doesn't match the one predicted by the model";
                          Assert.assertTrue(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()),
                                            t.getMessage().contains(expected));
                      });
     }
 
-
-    static void negativeTest(Consumer<Run> corrupt, Consumer<Throwable> validate)
+    Configuration.ModelConfiguration modelConfiguration()
     {
-        Configuration config = sharedConfiguration()
-                               .setModel(new Configuration.QuiescentCheckerConfig())
-                               .setClusteringDescriptorSelector((rng, schemaSpec) -> {
-                                   return new OpSelectors.DefaultDescriptorSelector(rng,
-                                                                                    new OpSelectors.ColumnSelectorBuilder().forAll(schemaSpec.regularColumns.size()).build(),
-                                                                                    Surjections.pick(OpSelectors.OperationKind.DELETE_COLUMN,
-                                                                                                     OpSelectors.OperationKind.DELETE_ROW,
-                                                                                                     OpSelectors.OperationKind.WRITE),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    new Distribution.ConstantDistribution(10),
-                                                                                    100);
-                               })
-                               .build();
-        Run run = config.createRun();
-        run.sut.schemaChange(run.schemaSpec.compile().cql());
-        OpSelectors.MonotonicClock clock = run.clock;
-        Validator validator = run.validator;
-        PartitionVisitor partitionVisitor = run.visitorFactory.get();
-
-        for (int i = 0; i < 200; i++)
-        {
-            long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
-        }
-
-        corrupt.accept(run);
-
-        try
-        {
-            validator.validatePartition(0);
-        }
-        catch (Throwable t)
-        {
-            validate.accept(t);
-        }
+        return new Configuration.QuiescentCheckerConfig();
     }
 }
\ No newline at end of file
diff --git a/harry-integration/test/harry/op/RowVisitorTest.java b/harry-integration/test/harry/op/RowVisitorTest.java
index e58c05b..c8ad182 100644
--- a/harry-integration/test/harry/op/RowVisitorTest.java
+++ b/harry-integration/test/harry/op/RowVisitorTest.java
@@ -24,15 +24,19 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import harry.core.MetricReporter;
+import harry.core.Run;
 import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.generators.RandomGenerator;
 import harry.generators.distribution.Distribution;
-import harry.runner.DefaultRowVisitor;
+import harry.model.sut.SystemUnderTest;
+import harry.runner.DataTracker;
+import harry.runner.MutatingRowVisitor;
 import harry.model.clock.OffsetClock;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
-import harry.runner.QuerySelector;
+import harry.runner.QueryGenerator;
 import harry.util.BitSet;
 import org.apache.cassandra.cql3.CQLTester;
 
@@ -50,32 +54,37 @@ public class RowVisitorTest extends CQLTester
     @Test
     public void rowWriteGeneratorTest()
     {
-        Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(5);
+        Supplier<SchemaSpec> specGenerator = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
         RandomGenerator rand = RandomGenerator.forTests(6371747244598697093L);
 
-        OpSelectors.Rng opRng = new OpSelectors.PCGFast(1);
-        OpSelectors.DefaultDescriptorSelector descriptorSelector = new OpSelectors.DefaultDescriptorSelector(new OpSelectors.PCGFast(1L),
-                                                                                                             OpSelectors.columnSelectorBuilder().forAll(BitSet.create(0b001, 3),
-                                                                                                                                                        BitSet.create(0b011, 3),
-                                                                                                                                                        BitSet.create(0b111, 3))
-                                                                                                                        .build(),
-                                                                                                             DEFAULT_OP_TYPE_SELECTOR,
-                                                                                                             new Distribution.ScaledDistribution(1, 3),
-                                                                                                             new Distribution.ScaledDistribution(2, 30),
-                                                                                                             100);
+        OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
 
-        for (int i = 0; i < SchemaGenerators.PROGRESSIVE_GENERATORS.length * 5; i++)
+        OpSelectors.PdSelector pdSelector = new OpSelectors.DefaultPdSelector(rng, 10, 10);
+        OpSelectors.DescriptorSelector descriptorSelector = new OpSelectors.DefaultDescriptorSelector(rng,
+                                                                                                      OpSelectors.columnSelectorBuilder().forAll(BitSet.create(0b001, 3),
+                                                                                                                                                 BitSet.create(0b011, 3),
+                                                                                                                                                 BitSet.create(0b111, 3))
+                                                                                                                 .build(),
+                                                                                                      DEFAULT_OP_TYPE_SELECTOR,
+                                                                                                      new Distribution.ScaledDistribution(1, 3),
+                                                                                                      new Distribution.ScaledDistribution(2, 30),
+                                                                                                      100);
+
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
         {
             SchemaSpec schema = specGenerator.get();
             createTable(schema.compile().cql());
 
-            DefaultRowVisitor visitor = new DefaultRowVisitor(schema,
-                                                              new OffsetClock(10000),
-                                                              descriptorSelector,
-                                                              new QuerySelector(schema,
-                                                                                new OpSelectors.DefaultPdSelector(opRng, 10, 10),
-                                                                                descriptorSelector,
-                                                                                opRng));
+            Run run = new Run(rng,
+                              new OffsetClock(10000),
+                              pdSelector,
+                              descriptorSelector,
+                              schema,
+                              DataTracker.NO_OP,
+                              SystemUnderTest.NO_OP,
+                              MetricReporter.NO_OP);
+
+            MutatingRowVisitor visitor = new MutatingRowVisitor(run);
             long[] descriptors = rand.next(4);
 
             execute(visitor.write(Math.abs(descriptors[0]),
diff --git a/pom.xml b/pom.xml
index efb1cf2..8a172eb 100755
--- a/pom.xml
+++ b/pom.xml
@@ -38,9 +38,9 @@
     <properties>
         <javac.target>1.8</javac.target>
         <harry.version>0.0.1-SNAPSHOT</harry.version>
-        <cassandra.version>4.0.1-SNAPSHOT</cassandra.version>
+        <cassandra.version>4.0.0-SNAPSHOT</cassandra.version>
         <jackson.version>2.11.3</jackson.version>
-        <dtest.version>0.0.6</dtest.version>
+        <dtest.version>0.0.7</dtest.version>
         <jmh.version>1.11.3</jmh.version>
         <argLine.common>
             -server

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org