You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2022/01/10 10:46:14 UTC

[cassandra-harry] 01/01: Features: * Implement lookbehind via tracker callbacks * Improve DSL * Rename maxLts to peek * Split lts visitors from visitors * Allow create table if not exists * Allow sampler to be triggered at every LTS * Allow local state validator to always run * Add Staged Runner * Add wait for token ranges * Make keyspace DDL configurable * Rename PartitionVisitor to Visitor

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git

commit 275f188660b66743bf3f055c8d7da438ad826061
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Thu Nov 25 08:45:00 2021 +0100

    Features:
      * Implement lookbehind via tracker callbacks
      * Improve DSL
      * Rename maxLts to peek
      * Split lts visitors from visitors
      * Allow create table if not exists
      * Allow sampler to be triggered at every LTS
      * Allow local state validator to always run
      * Add Staged Runner
      * Add wait for token ranges
      * Make keyspace DDL configurable
      * Rename PartitionVisitor to Visitor
    
    Bugfixes:
      * Fix for queue draining
      * Fix distribution of the single-op values
      * Fix bug in schema helper: static columns are listed as duplicates
    
    Patch by Alex Petrov for CASSANDRA-16262
    
    Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
 harry-core/src/harry/core/Configuration.java       | 145 ++++----
 harry-core/src/harry/core/Run.java                 |   2 -
 .../src/harry/corruptor/HideRowCorruptor.java      |   2 +-
 .../src/harry/corruptor/HideValueCorruptor.java    |   4 +-
 .../src/harry/corruptor/ShowValueCorruptor.java    |   2 +-
 harry-core/src/harry/ddl/ColumnSpec.java           |  16 +
 harry-core/src/harry/ddl/SchemaSpec.java           |   5 +-
 harry-core/src/harry/dsl/HistoryBuilder.java       |  24 +-
 harry-core/src/harry/generators/RngUtils.java      |  16 +
 harry-core/src/harry/model/OpSelectors.java        |  38 +-
 harry-core/src/harry/model/QuiescentChecker.java   |  38 +-
 .../model/clock/ApproximateMonotonicClock.java     |  16 +-
 harry-core/src/harry/model/clock/OffsetClock.java  |   9 +-
 harry-core/src/harry/reconciler/Reconciler.java    |  27 +-
 harry-core/src/harry/runner/DataTracker.java       |  54 ++-
 .../src/harry/runner/DefaultDataTracker.java       |  17 +-
 harry-core/src/harry/runner/HarryRunner.java       |  13 +-
 harry-core/src/harry/runner/Runner.java            | 406 ++++++++++++++-------
 harry-core/src/harry/runner/StagedRunner.java      | 201 ++++++++++
 harry-core/src/harry/runner/UpToLtsRunner.java     | 105 ++++++
 harry-core/src/harry/util/ByteUtils.java           | 196 ++++++++++
 .../src/harry/visitors/AllPartitionsValidator.java |  38 +-
 .../src/harry/visitors/CorruptingVisitor.java      |   7 +-
 .../src/harry/visitors/DelegatingVisitor.java      |  61 ----
 .../src/harry/visitors/GeneratingVisitor.java      |   9 +-
 harry-core/src/harry/visitors/LoggingVisitor.java  |   1 -
 harry-core/src/harry/visitors/LtsVisitor.java      |  97 +++++
 harry-core/src/harry/visitors/MutatingVisitor.java |  24 +-
 .../src/harry/visitors/ParallelValidator.java      |   4 +-
 harry-core/src/harry/visitors/RecentValidator.java |  48 ++-
 .../src/harry/visitors/ReplayingVisitor.java       |  11 +-
 harry-core/src/harry/visitors/Sampler.java         |  11 +-
 harry-core/src/harry/visitors/SingleValidator.java |   6 +-
 harry-core/src/harry/visitors/VisitExecutor.java   |  14 +-
 harry-core/src/harry/visitors/Visitor.java         |   8 +-
 harry-core/test/harry/model/OpSelectorsTest.java   | 148 ++++----
 .../harry/runner/external/HarryRunnerExternal.java |   2 +-
 .../src/harry/model/sut/ByteUtils.java             | 115 ++++++
 .../model/sut/InJVMTokenAwareVisitExecutor.java    |  11 +-
 .../src/harry/model/sut/InJvmSut.java              |  14 +-
 .../src/harry/runner/HarryRunnerJvm.java           |   2 +-
 .../harry/runner/RepairingLocalStateValidator.java |  21 +-
 .../src/harry/runner/TrivialShrinker.java          |  25 +-
 .../src/harry/visitors/SkippingVisitor.java        |  13 +-
 .../generators/DataGeneratorsIntegrationTest.java  |   6 +-
 .../harry/model/HistoryBuilderIntegrationTest.java |  27 +-
 .../test/harry/model/HistoryBuilderTest.java       |   3 +-
 .../harry/model/InJVMTokenAwareExecutorTest.java   |  15 +-
 .../test/harry/model/IntegrationTestBase.java      |   4 +-
 .../test/harry/model/ModelTestBase.java            |  32 +-
 .../harry/model/QuerySelectorNegativeTest.java     |   7 +-
 .../test/harry/model/QuerySelectorTest.java        |  14 +-
 .../model/QuiescentCheckerIntegrationTest.java     |  39 +-
 .../test/resources/single_partition_test.yml       |   5 +-
 54 files changed, 1550 insertions(+), 628 deletions(-)

diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index c6fecc7..3545e18 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -76,6 +76,7 @@ public class Configuration
         mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
         mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
         mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
+        mapper.registerSubtypes(Configuration.SingleVisitRunnerConfig.class);
         mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
         mapper.registerSubtypes(Configuration.NoOpDataTrackerConfiguration.class);
 
@@ -106,6 +107,7 @@ public class Configuration
     public final SchemaProviderConfiguration schema_provider;
 
     public final boolean drop_schema;
+    public final String keyspace_ddl;
     public final boolean create_schema;
     public final boolean truncate_table;
 
@@ -117,13 +119,11 @@ public class Configuration
     public final PDSelectorConfiguration partition_descriptor_selector;
     public final CDSelectorConfiguration clustering_descriptor_selector;
 
-    public final long run_time;
-    public final TimeUnit run_time_unit;
-
     @JsonCreator
     public Configuration(@JsonProperty("seed") long seed,
                          @JsonProperty("schema_provider") SchemaProviderConfiguration schema_provider,
                          @JsonProperty("drop_schema") boolean drop_schema,
+                         @JsonProperty("create_keyspace") String keyspace_ddl,
                          @JsonProperty("create_schema") boolean create_schema,
                          @JsonProperty("truncate_schema") boolean truncate_table,
                          @JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
@@ -132,12 +132,11 @@ public class Configuration
                          @JsonProperty("system_under_test") SutConfiguration system_under_test,
                          @JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
                          @JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
-                         @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
-                         @JsonProperty(value = "run_time", defaultValue = "2") long run_time,
-                         @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit run_time_unit)
+                         @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector)
     {
         this.seed = seed;
         this.schema_provider = schema_provider;
+        this.keyspace_ddl = keyspace_ddl;
         this.drop_schema = drop_schema;
         this.create_schema = create_schema;
         this.truncate_table = truncate_table;
@@ -147,8 +146,6 @@ public class Configuration
         this.data_tracker = data_tracker;
         this.partition_descriptor_selector = partition_descriptor_selector;
         this.clustering_descriptor_selector = clustering_descriptor_selector;
-        this.run_time = run_time;
-        this.run_time_unit = run_time_unit;
         this.runner = runner;
     }
 
@@ -233,14 +230,15 @@ public class Configuration
         OpSelectors.MonotonicClock clock = snapshot.clock.make();
 
         MetricReporter metricReporter = snapshot.metric_reporter.make();
-        // TODO: parse schema
-        SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);
+
+        // TODO: validate that operation kind is compatible with schema, due to statics etc
+        SystemUnderTest sut = snapshot.system_under_test.make();
+
+        SchemaSpec schemaSpec = snapshot.schema_provider.make(seed, sut);
         schemaSpec.validate();
 
         OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
         OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
-        // TODO: validate that operation kind is compactible with schema, due to statics etc
-        SystemUnderTest sut = snapshot.system_under_test.make();
 
         return new Run(rng,
                        clock,
@@ -263,6 +261,7 @@ public class Configuration
         long seed;
         SchemaProviderConfiguration schema_provider = new DefaultSchemaProviderConfiguration();
 
+        String keyspace_ddl;
         boolean drop_schema;
         boolean create_schema;
         boolean truncate_table;
@@ -275,9 +274,6 @@ public class Configuration
         PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
         CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
 
-        long run_time = 2;
-        TimeUnit run_time_unit = TimeUnit.HOURS;
-
         public ConfigurationBuilder setSeed(long seed)
         {
             this.seed = seed;
@@ -290,20 +286,19 @@ public class Configuration
             return this;
         }
 
-        public ConfigurationBuilder setRunTime(long runTime, TimeUnit runTimeUnit)
+        public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
         {
-            this.run_time_unit = Objects.requireNonNull(runTimeUnit, "unit");
-            this.run_time = runTime;
+            this.data_tracker = tracker;
             return this;
         }
 
-
-        public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
+        public ConfigurationBuilder setDataTracker(String keyspace_ddl)
         {
-            this.data_tracker = tracker;
+            this.keyspace_ddl = keyspace_ddl;
             return this;
         }
 
+
         public ConfigurationBuilder setClock(ClockConfiguration clock)
         {
             this.clock = clock;
@@ -370,20 +365,16 @@ public class Configuration
             return new Configuration(seed,
                                      schema_provider,
                                      drop_schema,
+                                     keyspace_ddl,
                                      create_schema,
                                      truncate_table,
                                      metric_reporter,
                                      clock,
-
                                      runner,
                                      system_under_test,
                                      data_tracker,
-
                                      partition_descriptor_selector,
-                                     clustering_descriptor_selector,
-
-                                     run_time,
-                                     run_time_unit);
+                                     clustering_descriptor_selector);
         }
     }
 
@@ -403,8 +394,6 @@ public class Configuration
         builder.partition_descriptor_selector = partition_descriptor_selector;
         builder.clustering_descriptor_selector = clustering_descriptor_selector;
 
-        builder.run_time = run_time;
-        builder.run_time_unit = run_time_unit;
         return builder;
     }
 
@@ -425,36 +414,10 @@ public class Configuration
 
         public DataTracker make()
         {
-            return new DataTracker()
-            {
-                public void started(long lts)
-                {
-                }
-
-                public void finished(long lts)
-                {
-
-                }
-
-                public long maxStarted()
-                {
-                    return 0;
-                }
-
-                public long maxConsecutiveFinished()
-                {
-                    return 0;
-                }
-
-                public DataTrackerConfiguration toConfig()
-                {
-                    return null;
-                }
-            };
+            return DataTracker.NO_OP;
         }
     }
 
-
     @JsonTypeName("default")
     public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
     {
@@ -562,38 +525,74 @@ public class Configuration
     public static class ConcurrentRunnerConfig implements RunnerConfiguration
     {
         public final int concurrency;
-        public final List<VisitorConfiguration> visitor_factories;
+
+        @JsonProperty(value = "visitors")
+        public final List<VisitorConfiguration> visitorFactories;
+
+        public final long run_time;
+        public final TimeUnit run_time_unit;
 
         @JsonCreator
-        public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
-                                      @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
+        public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "4") int concurrency,
+                                      @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
+                                      @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                      @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
         {
             this.concurrency = concurrency;
-            this.visitor_factories = visitors;
+            this.visitorFactories = visitors;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
         }
 
         @Override
         public Runner make(Run run, Configuration config)
         {
-            return new Runner.ConcurrentRunner(run, config, concurrency, visitor_factories);
+            return new Runner.ConcurrentRunner(run, config, concurrency, visitorFactories, run_time, run_time_unit);
         }
     }
 
     @JsonTypeName("sequential")
     public static class SequentialRunnerConfig implements RunnerConfiguration
     {
-        public final List<VisitorConfiguration> visitor_factories;
+        @JsonProperty(value = "visitors")
+        public final List<VisitorConfiguration> visitorFactories;
+
+        public final long run_time;
+        public final TimeUnit run_time_unit;
+
+        @JsonCreator
+        public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
+                                      @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                      @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+        {
+            this.visitorFactories = visitors;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
+        }
+
+        @Override
+        public Runner make(Run run, Configuration config)
+        {
+            return new Runner.SequentialRunner(run, config, visitorFactories, run_time, run_time_unit);
+        }
+    }
+
+    @JsonTypeName("single")
+    public static class SingleVisitRunnerConfig implements RunnerConfiguration
+    {
+        @JsonProperty(value = "visitors")
+        public final List<VisitorConfiguration> visitorFactories;
 
         @JsonCreator
-        public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
+        public SingleVisitRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
         {
-            this.visitor_factories = visitors;
+            this.visitorFactories = visitors;
         }
 
         @Override
         public Runner make(Run run, Configuration config)
         {
-            return new Runner.SequentialRunner(run, config, visitor_factories);
+            return new Runner.SingleVisitRunner(run, config, visitorFactories);
         }
     }
 
@@ -675,7 +674,7 @@ public class Configuration
             operation_kind_weights = new HashMap<>();
         }
 
-        public WeightedSelectorBuilder addWeight(T v, int weight)
+        public WeightedSelectorBuilder<T> addWeight(T v, int weight)
         {
             operation_kind_weights.put(v, weight);
             return this;
@@ -852,7 +851,7 @@ public class Configuration
         }
     }
 
-    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
     public interface DistributionConfig extends Distribution.DistributionFactory
     {
     }
@@ -942,7 +941,7 @@ public class Configuration
         @Override
         public Visitor make(Run run)
         {
-            return new MutatingVisitor(run, row_visitor);
+            return new MutatingVisitor(run, row_visitor::make);
         }
     }
 
@@ -960,7 +959,7 @@ public class Configuration
         @Override
         public Visitor make(Run run)
         {
-            return new LoggingVisitor(run, row_visitor);
+            return new LoggingVisitor(run, row_visitor::make);
         }
     }
 
@@ -969,6 +968,8 @@ public class Configuration
     {
         public final int concurrency;
         public final int trigger_after;
+
+        @JsonProperty("model")
         public final Configuration.ModelConfiguration modelConfiguration;
 
         @JsonCreator
@@ -1055,7 +1056,7 @@ public class Configuration
     @JsonTypeName("default")
     public static class DefaultSchemaProviderConfiguration implements SchemaProviderConfiguration
     {
-        public SchemaSpec make(long seed)
+        public SchemaSpec make(long seed, SystemUnderTest sut)
         {
             return SchemaGenerators.defaultSchemaSpecGen("harry", "table0")
                                    .inflate(seed);
@@ -1103,7 +1104,7 @@ public class Configuration
             this.regular_columns = regulars;
             this.static_keys = statics;
         }
-        public SchemaSpec make(long seed)
+        public SchemaSpec make(long seed, SystemUnderTest sut)
         {
             return schemaSpec;
         }
@@ -1122,6 +1123,4 @@ public class Configuration
             return MetricReporter.NO_OP;
         }
     }
-
-    // TODO: schema provider by DDL
 }
diff --git a/harry-core/src/harry/core/Run.java b/harry-core/src/harry/core/Run.java
index b0c8afc..e3bc203 100644
--- a/harry-core/src/harry/core/Run.java
+++ b/harry-core/src/harry/core/Run.java
@@ -43,7 +43,6 @@ public class Run
                OpSelectors.MonotonicClock clock,
                OpSelectors.PdSelector pdSelector,
                OpSelectors.DescriptorSelector descriptorSelector,
-
                SchemaSpec schemaSpec,
                DataTracker tracker,
                SystemUnderTest sut,
@@ -59,7 +58,6 @@ public class Run
                 OpSelectors.PdSelector pdSelector,
                 OpSelectors.DescriptorSelector descriptorSelector,
                 QueryGenerator rangeSelector,
-
                 SchemaSpec schemaSpec,
                 DataTracker tracker,
                 SystemUnderTest sut,
diff --git a/harry-core/src/harry/corruptor/HideRowCorruptor.java b/harry-core/src/harry/corruptor/HideRowCorruptor.java
index a2a2648..ca16c2e 100644
--- a/harry-core/src/harry/corruptor/HideRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/HideRowCorruptor.java
@@ -44,6 +44,6 @@ public class HideRowCorruptor implements RowCorruptor
 
     public CompiledStatement corrupt(ResultSetRow row)
     {
-        return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.maxLts()) + 1);
+        return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.peek()));
     }
 }
diff --git a/harry-core/src/harry/corruptor/HideValueCorruptor.java b/harry-core/src/harry/corruptor/HideValueCorruptor.java
index 3cedcda..c8617ff 100644
--- a/harry-core/src/harry/corruptor/HideValueCorruptor.java
+++ b/harry-core/src/harry/corruptor/HideValueCorruptor.java
@@ -77,7 +77,7 @@ public class HideValueCorruptor implements RowCorruptor
                                                  row.pd,
                                                  mask,
                                                  schema.regularAndStaticColumnsMask(),
-                                                 clock.rts(clock.maxLts()) + 1);
+                                                 clock.rts(clock.peek()));
             }
         }
 
@@ -96,6 +96,6 @@ public class HideValueCorruptor implements RowCorruptor
                                          row.cd,
                                          mask,
                                          schema.regularAndStaticColumnsMask(),
-                                         clock.rts(clock.maxLts()) + 1);
+                                         clock.rts(clock.peek()));
     }
 }
diff --git a/harry-core/src/harry/corruptor/ShowValueCorruptor.java b/harry-core/src/harry/corruptor/ShowValueCorruptor.java
index 01372ce..236709e 100644
--- a/harry-core/src/harry/corruptor/ShowValueCorruptor.java
+++ b/harry-core/src/harry/corruptor/ShowValueCorruptor.java
@@ -72,6 +72,6 @@ public class ShowValueCorruptor implements RowCorruptor
         // We do not know LTS of the deleted row. We could try inferring it, but that
         // still won't help since we can't use it anyways, since collisions between a
         // written value and tombstone are resolved in favour of tombstone.
-        return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.maxLts()) + 1);
+        return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.peek()));
     }
 }
diff --git a/harry-core/src/harry/ddl/ColumnSpec.java b/harry-core/src/harry/ddl/ColumnSpec.java
index 6d652c4..9b52805 100644
--- a/harry-core/src/harry/ddl/ColumnSpec.java
+++ b/harry-core/src/harry/ddl/ColumnSpec.java
@@ -264,6 +264,21 @@ public class ColumnSpec<T>
         }
     };
 
+    public static final DataType<String> textType = new DataType<String>("text")
+    {
+        private final Bijections.Bijection<String> gen = new StringBijection();
+
+        public Bijections.Bijection<String> generator()
+        {
+            return gen;
+        }
+
+        public int compareLexicographically(long l, long r)
+        {
+            return Long.compare(l, r);
+        }
+    };
+
     public static DataType<String> asciiType(int nibbleSize, int maxRandomBytes)
     {
         Bijections.Bijection<String> gen = new StringBijection(nibbleSize, maxRandomBytes);
@@ -325,6 +340,7 @@ public class ColumnSpec<T>
     ColumnSpec.floatType,
     ColumnSpec.doubleType,
     ColumnSpec.asciiType,
+    ColumnSpec.textType,
     ColumnSpec.uuidType,
     ColumnSpec.timestampType));
 
diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java
index eba87c6..8d8a62f 100644
--- a/harry-core/src/harry/ddl/SchemaSpec.java
+++ b/harry-core/src/harry/ddl/SchemaSpec.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.function.Consumer;
 
 import harry.generators.DataGenerators;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.operations.Relation;
 import harry.util.BitSet;
@@ -36,7 +37,7 @@ public class SchemaSpec
 {
     public interface SchemaSpecFactory
     {
-        public SchemaSpec make(long seed);
+        public SchemaSpec make(long seed, SystemUnderTest sut);
     }
 
     public final DataGenerators.KeyGenerator pkGenerator;
@@ -244,7 +245,7 @@ public class SchemaSpec
     {
         StringBuilder sb = new StringBuilder();
 
-        sb.append("CREATE TABLE ");
+        sb.append("CREATE TABLE IF NOT EXISTS ");
         sb.append(keyspace)
           .append(".")
           .append(table)
diff --git a/harry-core/src/harry/dsl/HistoryBuilder.java b/harry-core/src/harry/dsl/HistoryBuilder.java
index 5e1ff6f..fa26277 100644
--- a/harry-core/src/harry/dsl/HistoryBuilder.java
+++ b/harry-core/src/harry/dsl/HistoryBuilder.java
@@ -31,8 +31,11 @@ import java.util.TreeSet;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
+import harry.core.Configuration;
 import harry.core.Run;
 import harry.model.OpSelectors;
+import harry.visitors.MutatingRowVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.ReplayingVisitor;
 import harry.visitors.VisitExecutor;
 
@@ -598,9 +601,19 @@ public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
         return new ReplayingVisitor.Operation(cd, opId, opType);
     }
 
+    public void replayAll(Run run)
+    {
+        visitor(run).replayAll();
+    }
+
+    public ReplayingVisitor visitor(Run run)
+    {
+        return visitor(new MutatingVisitor.MutatingVisitExecutor(run, new MutatingRowVisitor(run)));
+    }
+
     public ReplayingVisitor visitor(VisitExecutor executor)
     {
-        return new ReplayingVisitor(executor)
+        return new ReplayingVisitor(executor, run.clock::nextLts)
         {
             public Visit getVisit(long lts)
             {
@@ -608,16 +621,15 @@ public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
                 return log.get((int) lts);
             }
 
-            public void replayAll(Run run)
+            public void replayAll()
             {
                 long maxLts = HistoryBuilder.this.lts;
+
                 while (true)
                 {
-                    long lts = run.clock.currentLts();
-                    if (lts >= maxLts)
+                    if (run.clock.peek() >= maxLts)
                         return;
-                    visit(lts);
-                    run.clock.nextLts();
+                    visit();
                 }
             }
         };
diff --git a/harry-core/src/harry/generators/RngUtils.java b/harry-core/src/harry/generators/RngUtils.java
index 894204f..c974476 100644
--- a/harry-core/src/harry/generators/RngUtils.java
+++ b/harry-core/src/harry/generators/RngUtils.java
@@ -20,8 +20,15 @@ package harry.generators;
 
 import java.util.function.LongSupplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.visitors.AllPartitionsValidator;
+
 public class RngUtils
 {
+    private static final Logger logger = LoggerFactory.getLogger(RngUtils.class);
+
     private static final long CONSTANT = 0x2545F4914F6CDD1DL;
     public static long next(long input)
     {
@@ -131,8 +138,15 @@ public class RngUtils
         long min = 0;
         long max = ~0L;
         int n = 0;
+        int steps = 0;
         while (n != bits)
         {
+            if (steps > 10_000)
+            {
+                throw new RuntimeException(String.format("Could not generate bits after 10K tries. " +
+                                          "Inputs: bits=%d, length=%d", bits, length));
+            }
+
             long x = rng.getAsLong() & mask;
             x = min | (x & max);
             n = Long.bitCount(x);
@@ -140,6 +154,8 @@ public class RngUtils
                 max = x;
             else
                 min = x;
+
+            steps++;
         }
         return min;
     }
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index 7e5fc17..7727828 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -18,9 +18,11 @@
 
 package harry.model;
 
+import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.LongConsumer;
 
 import harry.core.Configuration;
 import harry.core.VisibleForTesting;
@@ -76,19 +78,15 @@ public interface OpSelectors
      * be taken to map a real-time timestamp from the value retrieved from the database in order
      * to map it back to the logical timestamp of the operation that wrote this value.
      */
-    public static interface MonotonicClock
+    public interface MonotonicClock
     {
         long rts(long lts);
-
         long lts(long rts);
 
-        long currentLts();
-
         long nextLts();
+        long peek();
 
-        long maxLts();
-
-        public Configuration.ClockConfiguration toConfig();
+        Configuration.ClockConfiguration toConfig();
     }
 
     public static interface MonotonicClockFactory
@@ -304,6 +302,7 @@ public interface OpSelectors
             return minLtsAt(position);
         }
 
+        // TODO: add maxPosition to make it easier/more accessible for the components like sampler, etc
         public long positionFor(long lts)
         {
             long windowStart = lts / switchAfter;
@@ -682,6 +681,14 @@ public interface OpSelectors
         public BitSet partitionLevelOperationsMask(long pd, long lts)
         {
             int totalOps = opsPerModification(lts) * numberOfModifications(lts);
+            if (totalOps > 64)
+            {
+                throw new IllegalArgumentException("RngUtils#randomBits currently supports only up to 64 bits of entropy, so we can not " +
+                                                   "split partition and row level operations for more than 64 operations at the moment." +
+                                                   "Set modifications_per_lts to a number that is lower than 64 and use rows_per_modification" +
+                                                   "to have more operations per LTS instead");
+            }
+
             long seed = rng.randomNumber(pd, lts);
 
             int partitionLevelOps = (int) Math.ceil(operationSelector.partitionLevelThreshold * totalOps);
@@ -692,8 +699,17 @@ public interface OpSelectors
 
         private OperationKind operationType(long pd, long lts, long opId, BitSet partitionLevelOperationsMask)
         {
-            long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
-            return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
+            try
+            {
+                long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
+                return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(String.format("Can not generate a random number with the following inputs: " +
+                                                         "pd=%d lts=%d opId=%d partitionLevelOperationsMask=%s",
+                                                         pd, lts, opId, partitionLevelOperationsMask));
+            }
         }
 
         public BitSet columnMask(long pd, long lts, long opId, OperationKind opType)
@@ -704,12 +720,12 @@ public interface OpSelectors
 
         public long vd(long pd, long cd, long lts, long opId, int col)
         {
-            return rng.randomNumber(opId, pd ^ cd ^ lts ^ col);
+            return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
         }
 
         public long modificationId(long pd, long cd, long lts, long vd, int col)
         {
-            return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col);
+            return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col) - 1;
         }
     }
 
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index a630df2..0246e98 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -41,7 +41,7 @@ public class QuiescentChecker implements Model
     protected final DataTracker tracker;
     protected final SystemUnderTest sut;
     protected final Reconciler reconciler;
-    protected final SchemaSpec schemaSpec;
+    protected final SchemaSpec schema;
 
     public QuiescentChecker(Run run)
     {
@@ -54,7 +54,7 @@ public class QuiescentChecker implements Model
         this.sut = run.sut;
         this.reconciler = reconciler;
         this.tracker = run.tracker;
-        this.schemaSpec = run.schemaSpec;
+        this.schema = run.schemaSpec;
     }
 
     public void validate(Query query)
@@ -84,8 +84,8 @@ public class QuiescentChecker implements Model
         {
             ResultSetRow actualRowState = actual.next();
             if (actualRowState.cd != partitionState.staticRow().cd)
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Found a row while model predicts statics only:" +
                                               "\nExpected: %s" +
                                               "\nActual: %s" +
@@ -96,8 +96,8 @@ public class QuiescentChecker implements Model
             for (int i = 0; i < actualRowState.vds.length; i++)
             {
                 if (actualRowState.vds[i] != NIL_DESCR || actualRowState.lts[i] != NO_TIMESTAMP)
-                    throw new ValidationException(partitionState.toString(schemaSpec),
-                                                  toString(actualRows, schemaSpec),
+                    throw new ValidationException(partitionState.toString(schema),
+                                                  toString(actualRows, schema),
                                                   "Found a row while model predicts statics only:" +
                                                   "\nActual: %s" +
                                                   "\nQuery: %s" +
@@ -105,7 +105,7 @@ public class QuiescentChecker implements Model
                                                   actualRowState, query.toSelectStatement());
             }
 
-            assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schemaSpec);
+            assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema);
         }
 
         while (actual.hasNext() && expected.hasNext())
@@ -114,45 +114,45 @@ public class QuiescentChecker implements Model
             Reconciler.RowState expectedRowState = expected.next();
             // TODO: this is not necessarily true. It can also be that ordering is incorrect.
             if (actualRowState.cd != expectedRowState.cd)
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Found a row in the model that is not present in the resultset:" +
                                               "\nExpected: %s" +
                                               "\nActual: %s" +
                                               "\nQuery: %s",
-                                              expectedRowState.toString(schemaSpec),
+                                              expectedRowState.toString(schema),
                                               actualRowState, query.toSelectStatement());
 
             if (!Arrays.equals(actualRowState.vds, expectedRowState.vds))
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Returned row state doesn't match the one predicted by the model:" +
                                               "\nExpected: %s (%s)" +
                                               "\nActual:   %s (%s)." +
                                               "\nQuery: %s",
-                                              Arrays.toString(expectedRowState.vds), expectedRowState.toString(schemaSpec),
+                                              Arrays.toString(expectedRowState.vds), expectedRowState.toString(schema),
                                               Arrays.toString(actualRowState.vds), actualRowState,
                                               query.toSelectStatement());
 
             if (!Arrays.equals(actualRowState.lts, expectedRowState.lts))
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Timestamps in the row state don't match ones predicted by the model:" +
                                               "\nExpected: %s (%s)" +
                                               "\nActual:   %s (%s)." +
                                               "\nQuery: %s",
-                                              Arrays.toString(expectedRowState.lts), expectedRowState.toString(schemaSpec),
+                                              Arrays.toString(expectedRowState.lts), expectedRowState.toString(schema),
                                               Arrays.toString(actualRowState.lts), actualRowState,
                                               query.toSelectStatement());
 
             if (partitionState.staticRow() != null || actualRowState.sds != null || actualRowState.slts != null)
-                assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schemaSpec);
+                assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema);
         }
 
         if (actual.hasNext() || expected.hasNext())
         {
-            throw new ValidationException(partitionState.toString(schemaSpec),
-                                          toString(actualRows, schemaSpec),
+            throw new ValidationException(partitionState.toString(schema),
+                                          toString(actualRows, schema),
                                           "Expected results to have the same number of results, but %s result iterator has more results." +
                                           "\nExpected: %s" +
                                           "\nActual:   %s" +
diff --git a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
index 186a618..6c6f27f 100644
--- a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
+++ b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
@@ -48,9 +48,9 @@ import harry.model.OpSelectors;
 // TODO: shut down
 public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
 {
-    private static long START_VALUE = 0;
-    private static long DEFUNCT = Long.MIN_VALUE;
-    private static long REBASE_IN_PROGRESS = Long.MIN_VALUE + 1;
+    public static final long START_VALUE = 0;
+    public static final long DEFUNCT = Long.MIN_VALUE;
+    public static final long REBASE_IN_PROGRESS = Long.MIN_VALUE + 1;
 
     // TODO: there's a theoretical possibility of a bug; when we have several consecutive epochs without
     // change in LTS, current implementation will return the latest epoch instead of the earliest one.
@@ -151,11 +151,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
             throw new IllegalStateException("No thread should have changed LTS during rebase. " + lts.get());
     }
 
-    public long currentLts()
-    {
-        return lts.get();
-    }
-
+    @Override
     public long nextLts()
     {
         long current = lts.get();
@@ -184,7 +180,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
         }
     }
 
-    public long maxLts()
+    public long peek()
     {
         while (true)
         {
@@ -236,7 +232,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
     // TODO: binary search instead
     public long rts(final long lts)
     {
-        assert lts <= maxLts() : String.format("Queried for LTS we haven't yet issued %d. Max is %d.", lts, maxLts());
+        assert lts <= peek() : String.format("Queried for LTS we haven't yet issued %d. Max is %d.", lts, peek());
 
         final int historyIdx = idx - 1;
         for (int i = 0; i < historySize - 1 && historyIdx - i >= 0; i++)
diff --git a/harry-core/src/harry/model/clock/OffsetClock.java b/harry-core/src/harry/model/clock/OffsetClock.java
index 6040125..6e8f891 100644
--- a/harry-core/src/harry/model/clock/OffsetClock.java
+++ b/harry-core/src/harry/model/clock/OffsetClock.java
@@ -28,7 +28,7 @@ import harry.model.OpSelectors;
 
 public class OffsetClock implements OpSelectors.MonotonicClock
 {
-    final AtomicLong lts = new AtomicLong(0);
+    final AtomicLong lts = new AtomicLong(ApproximateMonotonicClock.START_VALUE);
 
     private final long base;
 
@@ -47,17 +47,12 @@ public class OffsetClock implements OpSelectors.MonotonicClock
         return rts - base;
     }
 
-    public long currentLts()
-    {
-        return lts.get();
-    }
-
     public long nextLts()
     {
         return lts.getAndIncrement();
     }
 
-    public long maxLts()
+    public long peek()
     {
         return lts.get();
     }
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index 275b7c7..45649be 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -35,12 +35,12 @@ import harry.core.Run;
 import harry.ddl.ColumnSpec;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
-import harry.visitors.GeneratingVisitor;
-import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.LtsVisitor;
 import harry.visitors.ReplayingVisitor;
 import harry.visitors.VisitExecutor;
 
@@ -67,7 +67,7 @@ public class Reconciler
     private final QueryGenerator rangeSelector;
     private final SchemaSpec schema;
 
-    private final Function<VisitExecutor, Visitor> visitorFactory;
+    private final Function<VisitExecutor, LtsVisitor> visitorFactory;
 
     public Reconciler(Run run)
     {
@@ -76,13 +76,13 @@ public class Reconciler
     }
 
     public Reconciler(Run run,
-                      Function<VisitExecutor, Visitor> visitorFactory)
+                      Function<VisitExecutor, LtsVisitor> ltsVisitorFactory)
     {
         this.descriptorSelector = run.descriptorSelector;
         this.pdSelector = run.pdSelector;
         this.schema = run.schemaSpec;
         this.rangeSelector = run.rangeSelector;
-        this.visitorFactory = visitorFactory;
+        this.visitorFactory = ltsVisitorFactory;
     }
 
     private final long debugCd = Long.getLong("harry.reconciler.debug_cd", -1L);
@@ -91,7 +91,7 @@ public class Reconciler
     {
         PartitionState partitionState = new PartitionState();
 
-        class Processor implements VisitExecutor
+        class Processor extends VisitExecutor
         {
             // Whether or not a partition deletion was encountered on this LTS.
             private boolean hadPartitionDeletion = false;
@@ -100,7 +100,7 @@ public class Reconciler
             private final List<ReplayingVisitor.Operation> columnDeletes = new ArrayList<>();
 
             @Override
-            public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+            protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
             {
                 if (hadPartitionDeletion)
                     return;
@@ -151,7 +151,7 @@ public class Reconciler
             }
 
             @Override
-            public void beforeLts(long lts, long pd)
+            protected void beforeLts(long lts, long pd)
             {
                 rangeDeletes.clear();
                 writes.clear();
@@ -160,7 +160,7 @@ public class Reconciler
             }
 
             @Override
-            public void afterLts(long lts, long pd)
+            protected void afterLts(long lts, long pd)
             {
                 if (hadPartitionDeletion)
                     return;
@@ -248,13 +248,16 @@ public class Reconciler
             }
 
             @Override
-            public void afterBatch(long lts, long pd, long m) {}
+            protected void afterBatch(long lts, long pd, long m) {}
+
+            @Override
+            protected void beforeBatch(long lts, long pd, long m) {}
 
             @Override
-            public void beforeBatch(long lts, long pd, long m) {}
+            public void shutdown() throws InterruptedException {}
         }
 
-        Visitor visitor = visitorFactory.apply(new Processor());
+        LtsVisitor visitor = visitorFactory.apply(new Processor());
 
         long currentLts = pdSelector.minLtsFor(pd);
 
diff --git a/harry-core/src/harry/runner/DataTracker.java b/harry-core/src/harry/runner/DataTracker.java
index 9f518d1..0171e43 100644
--- a/harry-core/src/harry/runner/DataTracker.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -18,30 +18,62 @@
 
 package harry.runner;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.LongConsumer;
+
 import harry.core.Configuration;
 
-public interface DataTracker
+public abstract class DataTracker
 {
-    void started(long lts);
-    void finished(long lts);
+    protected List<LongConsumer> onStarted = new ArrayList<>();
+    protected List<LongConsumer> onFinished = new ArrayList<>();
+
+    public void onLtsStarted(LongConsumer onLts)
+    {
+        this.onStarted.add(onLts);
+    }
 
-    long maxStarted();
-    long maxConsecutiveFinished();
+    public void onLtsFinished(LongConsumer onLts)
+    {
+        this.onFinished.add(onLts);
+    }
 
-    public Configuration.DataTrackerConfiguration toConfig();
+    public void started(long lts)
+    {
+        startedInternal(lts);
+        for (LongConsumer consumer : onStarted)
+            consumer.accept(lts);
+    }
 
-    interface DataTrackerFactory {
+    public void finished(long lts)
+    {
+        finishedInternal(lts);
+        for (LongConsumer consumer : onFinished)
+            consumer.accept(lts);
+    }
+
+    abstract void startedInternal(long lts);
+    abstract void finishedInternal(long lts);
+
+    public abstract long maxStarted();
+    public abstract long maxConsecutiveFinished();
+
+    public abstract Configuration.DataTrackerConfiguration toConfig();
+
+    public static interface DataTrackerFactory
+    {
         DataTracker make();
     }
 
-    public static DataTracker NO_OP = new NoOpDataTracker();
+    public static final DataTracker NO_OP = new NoOpDataTracker();
 
-    class NoOpDataTracker implements DataTracker
+    public static class NoOpDataTracker extends DataTracker
     {
         private NoOpDataTracker() {}
 
-        public void started(long lts) {}
-        public void finished(long lts) {}
+        protected void startedInternal(long lts) {}
+        protected void finishedInternal(long lts) {}
         public long maxStarted() { return 0; }
         public long maxConsecutiveFinished() { return 0; }
 
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
index d5cdb4b..85e2637 100644
--- a/harry-core/src/harry/runner/DefaultDataTracker.java
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -29,14 +29,14 @@ import org.slf4j.LoggerFactory;
 import harry.core.Configuration;
 import harry.core.VisibleForTesting;
 
-public class DefaultDataTracker implements DataTracker
+public class DefaultDataTracker extends DataTracker
 {
     private static final Logger logger = LoggerFactory.getLogger(DefaultDataTracker.class);
 
     private final AtomicLong maxSeenLts;
     // TODO: This is a trivial implementation that can be significantly improved upon
     // for example, we could use a bitmap that records `1`s for all lts that are after
-    // the consective, and "collapse" the bitmap state into the long as soon as we see
+    // the consecutive, and "collapse" the bitmap state into the long as soon as we see
     // consecutive `1` on the left side.
     private final AtomicLong maxCompleteLts;
     private final PriorityBlockingQueue<Long> reorderBuffer;
@@ -49,13 +49,12 @@ public class DefaultDataTracker implements DataTracker
     }
 
     // TODO: there's also some room for improvement in terms of concurrency
-    // TODO: remove pd?
-    public void started(long lts)
+    protected void startedInternal(long lts)
     {
         recordEvent(lts, false);
     }
 
-    public void finished(long lts)
+    protected void finishedInternal(long lts)
     {
         recordEvent(lts, true);
     }
@@ -63,10 +62,7 @@ public class DefaultDataTracker implements DataTracker
     private void recordEvent(long lts, boolean finished)
     {
         // all seen LTS are allowed to be "in-flight"
-        maxSeenLts.getAndUpdate((old) -> {
-            assert finished || lts > old : String.format("Attempting to reuse lts: %d. Max seen: %d", lts, old);
-            return Math.max(lts, old);
-        });
+        maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
 
         if (!finished)
             return;
@@ -116,6 +112,9 @@ public class DefaultDataTracker implements DataTracker
 
     public long maxConsecutiveFinished()
     {
+        if (!reorderBuffer.isEmpty())
+            return drainReorderQueue();
+
         return maxCompleteLts.get();
     }
 
diff --git a/harry-core/src/harry/runner/HarryRunner.java b/harry-core/src/harry/runner/HarryRunner.java
index ba911c4..30f7f86 100644
--- a/harry-core/src/harry/runner/HarryRunner.java
+++ b/harry-core/src/harry/runner/HarryRunner.java
@@ -35,9 +35,9 @@ public abstract class HarryRunner
 {
     public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
 
-    protected CompletableFuture progress;
+    protected CompletableFuture<?> progress;
     protected ScheduledThreadPoolExecutor executor;
-    public abstract void beforeRun(Runner runner);
+    public abstract void beforeRun(Runner.TimedRunner runner);
     public void afterRun(Runner runner, Object result)
     {
         executor.shutdown();
@@ -64,7 +64,9 @@ public abstract class HarryRunner
         Run run = runner.getRun();
 
         progress = runner.initAndStartAll();
-        beforeRun(runner);
+        
+        assert runner instanceof Runner.TimedRunner : "Please use a timed runner at the top level.";
+        beforeRun((Runner.TimedRunner) runner);
 
         Object result = null;
 
@@ -76,13 +78,12 @@ public abstract class HarryRunner
                 return a;
             }).get();
             if (result instanceof Throwable)
-                logger.error("Execution failed", result);
+                logger.error("Execution failed!", (Throwable) result);
 
         }
         catch (Throwable e)
         {
-            logger.error("Failed due to exception: " + e.getMessage(),
-                         e);
+            logger.error("Failed due to exception: " + e.getMessage(), e);
             result = e;
         }
         finally
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index 3a84d3e..4f99eab 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -24,40 +24,44 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.model.OpSelectors;
 import harry.visitors.Visitor;
 
-
 public abstract class Runner
 {
     private static final Logger logger = LoggerFactory.getLogger(Runner.class);
 
     protected final Run run;
     protected final Configuration config;
+    protected final ScheduledExecutorService executor;
 
     // If there's an error, there's a good chance we're going to hit it more than once
-    //  since we have multiple concurrent checkers running
+    // since we have multiple concurrent checkers running
     protected final CopyOnWriteArrayList<Throwable> errors;
 
-    public Runner(Run run, Configuration config)
+    public Runner(Run run, Configuration config, int concurrency)
     {
         this.run = run;
         this.config = config;
         this.errors = new CopyOnWriteArrayList<>();
+        this.executor = Executors.newScheduledThreadPool(concurrency);
     }
 
     public Run getRun()
@@ -69,12 +73,11 @@ public abstract class Runner
     {
         if (config.create_schema)
         {
-            // TODO: make RF configurable or make keyspace DDL configurable
-            run.sut.schemaChange("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            if (config.keyspace_ddl == null)
+                run.sut.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            else
+                run.sut.schemaChange(config.keyspace_ddl);
 
-            run.sut.schemaChange(String.format("DROP TABLE IF EXISTS %s.%s;",
-                                          run.schemaSpec.keyspace,
-                                          run.schemaSpec.table));
             String schema = run.schemaSpec.compile().cql();
             logger.info("Creating table: " + schema);
             run.sut.schemaChange(schema);
@@ -115,13 +118,36 @@ public abstract class Runner
             dumpStateToFile(run, config, errors);
     }
 
-    public abstract CompletableFuture<?> initAndStartAll() throws InterruptedException;
+    public CompletableFuture<?> initAndStartAll()
+    {
+        init();
+        return start();
+    }
 
+    public abstract String type();
+        
     public abstract void shutdown() throws InterruptedException;
+    
+    protected CompletableFuture<?> start()
+    {
+        return start(true, () -> false);
+    }
+    
+    protected abstract void shutDownVisitors();
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    protected void shutDownExecutors() throws InterruptedException
+    {
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.MINUTES);
+    }
+    
+    protected abstract CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit);
 
     protected Runnable reportThrowable(Runnable runnable, CompletableFuture<?> future)
     {
-        return () -> {
+        return () ->
+        {
             try
             {
                 if (!future.isDone())
@@ -130,193 +156,313 @@ public abstract class Runner
             catch (Throwable t)
             {
                 errors.add(t);
+                
                 if (!future.isDone())
                     future.completeExceptionally(t);
             }
         };
     }
 
-    public static class SequentialRunner extends Runner
+    public interface RunnerFactory
+    {
+        Runner make(Run run, Configuration config);
+    }
+
+    public abstract static class TimedRunner extends Runner
+    {
+        public final long runtime;
+        public final TimeUnit runtimeUnit;
+        
+        protected final ScheduledExecutorService shutdownExecutor;
+
+        public TimedRunner(Run run, Configuration config, int concurrency, long runtime, TimeUnit runtimeUnit)
+        {
+            super(run, config, concurrency);
+
+            this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
+            this.runtime = runtime;
+            this.runtimeUnit = runtimeUnit;
+        }
+
+        protected ScheduledFuture<?> scheduleTermination(AtomicBoolean terminated)
+        {
+            return shutdownExecutor.schedule(() ->
+                                             {
+                                                 logger.info("Runner has reached configured runtime. Stopping...");
+                                                 // TODO: wait for the last full validation?
+                                                 terminated.set(true);
+                                             },
+                                             runtime,
+                                             runtimeUnit);
+        }
+
+        @Override
+        @SuppressWarnings("ResultOfMethodCallIgnored")
+        protected void shutDownExecutors() throws InterruptedException
+        {
+            shutdownExecutor.shutdownNow();
+            shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
+            executor.shutdownNow();
+            executor.awaitTermination(1, TimeUnit.MINUTES);
+        }
+    }
+
+    public static class SingleVisitRunner extends Runner
     {
-        private final ScheduledExecutorService executor;
-        private final ScheduledExecutorService shutdownExceutor;
         private final List<Visitor> visitors;
-        private final Configuration config;
 
-        public SequentialRunner(Run run,
+        public SingleVisitRunner(Run run,
                                 Configuration config,
                                 List<? extends Visitor.VisitorFactory> visitorFactories)
         {
-            super(run, config);
-
-            this.executor = Executors.newSingleThreadScheduledExecutor();
-            this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
-            this.config = config;
-            this.visitors = new ArrayList<>();
-            for (Visitor.VisitorFactory factory : visitorFactories)
-                visitors.add(factory.make(run));
+            super(run, config, 1);
+            this.visitors = visitorFactories.stream().map(factory -> factory.make(run)).collect(Collectors.toList());
         }
 
-        public CompletableFuture<?> initAndStartAll()
+        @Override
+        public String type()
+        {
+            return "single";
+        }
+
+        @Override
+        protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
         {
-            init();
             CompletableFuture<?> future = new CompletableFuture<>();
-            future.whenComplete((a, b) -> maybeReportErrors());
-
-            AtomicBoolean completed = new AtomicBoolean(false);
-            shutdownExceutor.schedule(() -> {
-                logger.info("Completed");
-                // TODO: wait for the last full validation?
-                completed.set(true);
-            }, config.run_time, config.run_time_unit);
-
-            executor.submit(reportThrowable(() -> {
-                                                try
-                                                {
-                                                    SequentialRunner.run(visitors, run.clock, future,
-                                                                         () -> Thread.currentThread().isInterrupted() || future.isDone() || completed.get());
-                                                }
-                                                catch (Throwable t)
-                                                {
-                                                    future.completeExceptionally(t);
-                                                }
-                                            },
-                                            future));
 
+            if (reportErrors)
+                future.whenComplete((a, b) -> maybeReportErrors());
+
+            executor.submit(reportThrowable(() -> run(visitors, future, parentExit), future));
             return future;
         }
 
-        static void run(List<Visitor> visitors,
-                        OpSelectors.MonotonicClock clock,
-                        CompletableFuture<?> future,
-                        BooleanSupplier exitCondition)
+        private void run(List<Visitor> visitors, CompletableFuture<?> future, BooleanSupplier parentExit)
         {
-            while (!exitCondition.getAsBoolean())
+            for (Visitor value: visitors)
             {
-                long lts = clock.nextLts();
+                if (parentExit.getAsBoolean())
+                    break;
+
+                value.visit();
+            }
+
+            future.complete(null);
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException
+        {
+            logger.info("Shutting down...");
+            shutDownVisitors();
 
-                if (lts > 0 && lts % 10_000 == 0)
-                    logger.info("Visited {} logical timestamps", lts);
+            // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+            shutDownExecutors();
 
-                for (int i = 0; i < visitors.size() && !exitCondition.getAsBoolean(); i++)
+            teardown();
+        }
+
+        @Override
+        protected void shutDownVisitors()
+        {
+            shutDownVisitors(visitors);
+        }
+    }
+
+    public static class SequentialRunner extends TimedRunner
+    {
+        protected final List<Visitor> visitors;
+
+        public SequentialRunner(Run run,
+                                Configuration config,
+                                List<? extends Visitor.VisitorFactory> visitorFactories,
+                                long runtime, TimeUnit runtimeUnit)
+        {
+            super(run, config, 1, runtime, runtimeUnit);
+
+            this.visitors = visitorFactories.stream().map(factory -> factory.make(run)).collect(Collectors.toList());
+        }
+
+        @Override
+        public String type()
+        {
+            return "sequential";
+        }
+
+        @Override
+        protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+        {
+            CompletableFuture<?> future = new CompletableFuture<>();
+            
+            if (reportErrors)
+                future.whenComplete((a, b) -> maybeReportErrors());
+
+            AtomicBoolean terminated = new AtomicBoolean(false);
+            scheduleTermination(terminated);
+            BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone() 
+                                         || terminated.get() || parentExit.getAsBoolean();
+
+            executor.submit(reportThrowable(() -> run(visitors, future, exit), future));
+            return future;
+        }
+
+        protected void run(List<Visitor> visitors,
+                           CompletableFuture<?> future,
+                           BooleanSupplier exit)
+        {
+            while (!exit.getAsBoolean())
+            {
+                for (Visitor visitor: visitors)
                 {
-                    try
-                    {
-                        Visitor visitor = visitors.get(i);
-                        visitor.visit(lts);
-                    }
-                    catch (Throwable t)
-                    {
-                        future.completeExceptionally(t);
-                        throw t;
-                    }
+                    if (exit.getAsBoolean())
+                        break;
+                    visitor.visit();
                 }
             }
+
             future.complete(null);
         }
 
+        @Override
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            shutdownExceutor.shutdownNow();
-            shutdownExceutor.awaitTermination(1, TimeUnit.MINUTES);
+            shutDownVisitors();
 
-            executor.shutdownNow();
-            executor.awaitTermination(1, TimeUnit.MINUTES);
             // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+            shutDownExecutors();
+            
             teardown();
         }
-    }
 
-    public static interface RunnerFactory
-    {
-        public Runner make(Run run, Configuration config);
+        @Override
+        protected void shutDownVisitors()
+        {
+            shutDownVisitors(visitors);
+        }
     }
 
     // TODO: this requires some significant improvement
-    public static class ConcurrentRunner extends Runner
+    public static class ConcurrentRunner extends TimedRunner
     {
-        private final ScheduledExecutorService executor;
-        private final ScheduledExecutorService shutdownExecutor;
-        private final List<? extends Visitor.VisitorFactory> visitorFactories;
-        private final List<Visitor> allVisitors;
-
+        private final List<List<Visitor>> perThreadVisitors;
         private final int concurrency;
-        private final long runTime;
-        private final TimeUnit runTimeUnit;
 
         public ConcurrentRunner(Run run,
                                 Configuration config,
                                 int concurrency,
-                                List<? extends Visitor.VisitorFactory> visitorFactories)
+                                List<? extends Visitor.VisitorFactory> visitorFactories,
+                                long runtime, TimeUnit runtimeUnit)
         {
-            super(run, config);
-            this.concurrency = concurrency;
-            this.runTime = config.run_time;
-            this.runTimeUnit = config.run_time_unit;
-            // TODO: configure concurrency
-            this.executor = Executors.newScheduledThreadPool(concurrency);
-            this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
-            this.visitorFactories = visitorFactories;
-            this.allVisitors = new CopyOnWriteArrayList<>();
-        }
+            super(run, config, concurrency, runtime, runtimeUnit);
 
-        public CompletableFuture<?> initAndStartAll()
-        {
-            init();
-            CompletableFuture<?> future = new CompletableFuture<>();
-            future.whenComplete((a, b) -> maybeReportErrors());
-
-            shutdownExecutor.schedule(() -> {
-                logger.info("Completed");
-                // TODO: wait for the last full validation?
-                future.complete(null);
-            }, runTime, runTimeUnit);
+            this.concurrency = concurrency;
+            this.perThreadVisitors = new ArrayList<>(concurrency);
 
-            BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
             for (int i = 0; i < concurrency; i++)
             {
                 List<Visitor> visitors = new ArrayList<>();
-                executor.submit(reportThrowable(() -> {
-                                                    for (Visitor.VisitorFactory factory : visitorFactories)
-                                                        visitors.add(factory.make(run));
 
-                                                    allVisitors.addAll(visitors);
-                                                    run(visitors, run.clock, exitCondition);
-                                                },
-                                                future));
+                for (Visitor.VisitorFactory factory : visitorFactories)
+                    visitors.add(factory.make(run));
+
+                perThreadVisitors.add(visitors);
+            }
+        }
+
+        @Override
+        public String type()
+        {
+            return "concurrent";
+        }
 
+        @Override
+        protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+        {
+            CompletableFuture<?> future = new CompletableFuture<>();
+            
+            if (reportErrors)
+                future.whenComplete((a, b) -> maybeReportErrors());
+
+            AtomicBoolean terminated = new AtomicBoolean(false);
+            scheduleTermination(terminated);
+            BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone() 
+                                         || terminated.get() || parentExit.getAsBoolean();
+            
+            AtomicInteger liveCount = new AtomicInteger(0);
+            
+            for (int i = 0; i < concurrency; i++)
+            {
+                List<Visitor> visitors = perThreadVisitors.get(i);
+                executor.submit(reportThrowable(() -> run(visitors, future, exit, liveCount), future));
             }
 
             return future;
         }
 
-        void run(List<Visitor> visitors,
-                 OpSelectors.MonotonicClock clock,
-                 BooleanSupplier exitCondition)
+        private void run(List<Visitor> visitors,
+                         CompletableFuture<?> future,
+                         BooleanSupplier exit,
+                         AtomicInteger liveCount)
         {
-            while (!exitCondition.getAsBoolean())
+            liveCount.incrementAndGet();
+            
+            while (!exit.getAsBoolean())
             {
-                long lts = clock.nextLts();
                 for (Visitor visitor : visitors)
-                    visitor.visit(lts);
+                {
+                    if (exit.getAsBoolean())
+                        break;
+
+                    visitor.visit();
+                }
             }
+            
+            // If we're the last worker still running, complete the future....
+            if (liveCount.decrementAndGet() == 0)
+                future.complete(null);
         }
 
+        @Override
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            for (Visitor visitor : allVisitors)
-                visitor.shutdown();
-
-            shutdownExecutor.shutdownNow();
-            shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
+            shutDownVisitors();
 
-            executor.shutdownNow();
-            executor.awaitTermination(1, TimeUnit.MINUTES);
             // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+            shutDownExecutors();
+
             teardown();
         }
+
+        @Override
+        protected void shutDownVisitors()
+        {
+            shutDownVisitors(perThreadVisitors.stream().flatMap(Collection::stream).collect(Collectors.toList()));
+        }
+    }
+
+    protected static void shutDownVisitors(List<Visitor> visitors)
+    {
+        Throwable error = null;
+        
+        for (Visitor visitor : visitors)
+        {
+            try
+            {
+                visitor.shutdown();
+            }
+            catch (InterruptedException e)
+            {
+                if (error != null)
+                    error.addSuppressed(e);
+                else
+                    error = e;
+            }
+        }
+        
+        if (error != null)
+            logger.warn("Failed to shut down all visitors!", error);
     }
 
     private static void dumpExceptionToFile(BufferedWriter bw, Throwable throwable) throws IOException
@@ -361,7 +507,7 @@ public abstract class Runner
             File file = new File("run.yaml");
             Configuration.ConfigurationBuilder builder = config.unbuild();
 
-            // overrride stateful components
+            // override stateful components
             builder.setClock(run.clock.toConfig());
             builder.setDataTracker(run.tracker.toConfig());
 
@@ -373,12 +519,14 @@ public abstract class Runner
         }
         catch (Throwable e)
         {
-            logger.error("Caught an error while trying to dump to file",
-                         e);
+            logger.error("Caught an error while trying to dump to file", e);
             try
             {
                 File f = new File("tmp.dump");
-                f.createNewFile();
+                
+                if (!f.createNewFile())
+                    logger.info("File {} already exists. Appending...", f);
+                
                 BufferedWriter tmp = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
                 dumpExceptionToFile(tmp, e);
             }
diff --git a/harry-core/src/harry/runner/StagedRunner.java b/harry-core/src/harry/runner/StagedRunner.java
new file mode 100644
index 0000000..88ebcdb
--- /dev/null
+++ b/harry-core/src/harry/runner/StagedRunner.java
@@ -0,0 +1,201 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Configuration;
+import harry.core.Run;
+
+public class StagedRunner extends Runner.TimedRunner
+{
+    public static final String TYPE = "staged";
+
+    public static void register()
+    {
+        Configuration.registerSubtypes(StagedRunner.StagedRunnerConfig.class);
+    }
+    
+    private static final Logger logger = LoggerFactory.getLogger(StagedRunner.class);
+    
+    private final List<Configuration.RunnerConfiguration> runnerFactories;
+    private final List<Runner> stages;
+
+    public StagedRunner(Run run,
+                        Configuration config,
+                        List<Configuration.RunnerConfiguration> runnerFactories,
+                        long runtime, TimeUnit runtimeUnit)
+    {
+        super(run, config, 1, runtime, runtimeUnit);
+        this.runnerFactories = runnerFactories;
+        this.stages = new CopyOnWriteArrayList<>();
+    }
+
+    @Override
+    public String type() {
+        return TYPE;
+    }
+
+    @Override
+    protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+    {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (reportErrors)
+        {
+            future.whenComplete((a, b) ->
+            {
+                collectErrors();
+                maybeReportErrors();
+            });
+        }
+
+        AtomicBoolean terminated = new AtomicBoolean(false);
+        scheduleTermination(terminated);
+        
+        BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone()
+                                     || terminated.get() || parentExit.getAsBoolean();
+
+        executor.submit(() ->
+        {
+            for (Configuration.RunnerConfiguration runnerConfig : runnerFactories)
+            {
+                try
+                {
+                    Runner runner = runnerConfig.make(run, config);
+                    if (runner instanceof StagedRunner)
+                        throw new IllegalArgumentException("StagedRunner can not be nested inside of a StagedRunner");
+                    stages.add(runner);
+                }
+                catch (Throwable t)
+                {
+                    future.completeExceptionally(t);
+                }
+            }
+
+            while (!exit.getAsBoolean())
+            {
+                logger.info("Starting next staged run...");
+                int stage = 1;
+
+                for (Runner runner : stages)
+                {
+                    if (exit.getAsBoolean())
+                        break;
+
+                    try
+                    {
+                        logger.info("Starting stage {}: {}...", stage++, runner.type());
+                        runner.start(false, exit).get();
+
+                        if (!terminated.get())
+                            logger.info("...stage complete.");
+
+                        // Wait for any previous runners to settle down...
+                        while (run.tracker.maxConsecutiveFinished() != run.tracker.maxStarted())
+                        {
+                            TimeUnit.SECONDS.sleep(1);
+                            logger.warn("Waiting for any previous runners to settle down: {}",
+                                        run.tracker.toString());
+                        }
+
+                    }
+                    catch (Throwable t)
+                    {
+                        future.completeExceptionally(t);
+                        break;
+                    }
+                }
+
+                if (!terminated.get())
+                    logger.info("All stages complete!");
+            }
+
+            future.complete(null);
+        });
+
+        return future;
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException
+    {
+        logger.info("Shutting down...");
+
+        for (Runner runner : stages)
+        {
+            runner.shutDownVisitors();
+            runner.shutDownExecutors();
+        }
+
+        shutDownExecutors();
+        teardown();
+    }
+
+    @Override
+    protected void shutDownVisitors()
+    {
+        // Visitors are shut down in the sub-runners.    
+    }
+
+    private void collectErrors()
+    {
+        for (Runner runner : stages)
+        {
+            errors.addAll(runner.errors);
+        }
+    }
+
+    @JsonTypeName(TYPE)
+    public static class StagedRunnerConfig implements Configuration.RunnerConfiguration
+    {
+        @JsonProperty(value = "stages")
+        public final List<Configuration.RunnerConfiguration> runnerFactories;
+
+        public final long run_time;
+        public final TimeUnit run_time_unit;
+
+        @JsonCreator
+        public StagedRunnerConfig(@JsonProperty(value = "stages") List<Configuration.RunnerConfiguration> stages,
+                                  @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                  @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+        {
+            this.runnerFactories = stages;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
+        }
+
+        @Override
+        public Runner make(Run run, Configuration config)
+        {
+            return new StagedRunner(run, config, runnerFactories, run_time, run_time_unit);
+        }
+    }
+}
diff --git a/harry-core/src/harry/runner/UpToLtsRunner.java b/harry-core/src/harry/runner/UpToLtsRunner.java
new file mode 100644
index 0000000..f89ecbb
--- /dev/null
+++ b/harry-core/src/harry/runner/UpToLtsRunner.java
@@ -0,0 +1,105 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.visitors.Visitor;
+
+public class UpToLtsRunner extends Runner.SequentialRunner
+{
+    public static final String TYPE = "up_to_lts";
+
+    public static void register()
+    {
+        Configuration.registerSubtypes(UpToLtsRunnerConfig.class);
+    }
+
+    private final long maxLts;
+
+    public UpToLtsRunner(Run run,
+                         Configuration config, 
+                         List<? extends Visitor.VisitorFactory> visitorFactories, 
+                         long maxLts,
+                         long runtime, TimeUnit runtimeUnit)
+    {
+        super(run, config, visitorFactories, runtime, runtimeUnit);
+        this.maxLts = maxLts;
+    }
+
+    @Override
+    public String type()
+    {
+        return TYPE;
+    }
+
+    @Override
+    protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+    {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (reportErrors)
+            future.whenComplete((a, b) -> maybeReportErrors());
+
+        AtomicBoolean terminated = new AtomicBoolean(false);
+        scheduleTermination(terminated);
+        BooleanSupplier exit = () -> run.clock.peek() >= maxLts
+                                     || Thread.currentThread().isInterrupted() || future.isDone()
+                                     || terminated.get() || parentExit.getAsBoolean();
+
+        executor.submit(reportThrowable(() -> run(visitors, future, exit), future));
+        return future;
+    }
+
+    @JsonTypeName(TYPE)
+    public static class UpToLtsRunnerConfig implements Configuration.RunnerConfiguration
+    {
+        public final List<Configuration.VisitorConfiguration> visitor_factories;
+        public final long max_lts;
+        public final long run_time;
+        public final TimeUnit run_time_unit;
+
+        @JsonCreator
+        public UpToLtsRunnerConfig(@JsonProperty(value = "visitors") List<Configuration.VisitorConfiguration> visitors,
+                                   @JsonProperty(value = "max_lts") long maxLts,
+                                   @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                   @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+        {
+            this.visitor_factories = visitors;
+            this.max_lts = maxLts;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
+        }
+
+        @Override
+        public Runner make(Run run, Configuration config)
+        {
+            return new UpToLtsRunner(run, config, visitor_factories, max_lts, run_time, run_time_unit);
+        }
+    }
+}
diff --git a/harry-core/src/harry/util/ByteUtils.java b/harry-core/src/harry/util/ByteUtils.java
new file mode 100644
index 0000000..4adf536
--- /dev/null
+++ b/harry-core/src/harry/util/ByteUtils.java
@@ -0,0 +1,196 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.util;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class ByteUtils
+{
+    public static ByteBuffer bytes(String s)
+    {
+        return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+    }
+
+    public static ByteBuffer bytes(String s, Charset charset)
+    {
+        return ByteBuffer.wrap(s.getBytes(charset));
+    }
+
+    public static ByteBuffer bytes(byte b)
+    {
+        return ByteBuffer.allocate(1).put(0, b);
+    }
+
+    public static ByteBuffer bytes(short s)
+    {
+        return ByteBuffer.allocate(2).putShort(0, s);
+    }
+
+    public static ByteBuffer bytes(int i)
+    {
+        return ByteBuffer.allocate(4).putInt(0, i);
+    }
+
+    public static ByteBuffer bytes(long n)
+    {
+        return ByteBuffer.allocate(8).putLong(0, n);
+    }
+
+    public static ByteBuffer bytes(float f)
+    {
+        return ByteBuffer.allocate(4).putFloat(0, f);
+    }
+
+    public static ByteBuffer bytes(double d)
+    {
+        return ByteBuffer.allocate(8).putDouble(0, d);
+    }
+
+    public static ByteBuffer bytes(InetAddress address)
+    {
+        return ByteBuffer.wrap(address.getAddress());
+    }
+
+    public static ByteBuffer bytes(UUID uuid)
+    {
+        return ByteBuffer.wrap(decompose(uuid));
+    }
+
+    public static byte[] decompose(UUID uuid)
+    {
+        long most = uuid.getMostSignificantBits();
+        long least = uuid.getLeastSignificantBits();
+        byte[] b = new byte[16];
+        for (int i = 0; i < 8; i++)
+        {
+            b[i] = (byte)(most >>> ((7-i) * 8));
+            b[8+i] = (byte)(least >>> ((7-i) * 8));
+        }
+        return b;
+    }
+
+
+    public static ByteBuffer objectToBytes(Object obj)
+    {
+        if (obj instanceof Integer)
+            return bytes((int) obj);
+        else if (obj instanceof Byte)
+            return bytes((byte) obj);
+        else if (obj instanceof Boolean)
+            return bytes( (byte) (((boolean) obj) ? 1 : 0));
+        else if (obj instanceof Short)
+            return bytes((short) obj);
+        else if (obj instanceof Long)
+            return bytes((long) obj);
+        else if (obj instanceof Float)
+            return bytes((float) obj);
+        else if (obj instanceof Double)
+            return bytes((double) obj);
+        else if (obj instanceof UUID)
+            return bytes((UUID) obj);
+        else if (obj instanceof InetAddress)
+            return bytes((InetAddress) obj);
+        else if (obj instanceof String)
+            return bytes((String) obj);
+        else if (obj instanceof List)
+        {
+            throw new UnsupportedOperationException("Please use ByteUtils from integration package");
+        }
+        else if (obj instanceof Set)
+        {
+            throw new UnsupportedOperationException("Please use ByteUtils from integration package");
+        }
+        else if (obj instanceof ByteBuffer)
+            return (ByteBuffer) obj;
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+
+    public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements)
+    {
+        int size = 0;
+        for (ByteBuffer bb : buffers)
+            size += sizeOfValue(bb);
+
+        ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements) + size);
+        writeCollectionSize(result, elements);
+        for (ByteBuffer bb : buffers)
+            writeValue(result, bb);
+        result.flip();
+        return result;
+    }
+
+    public static int sizeOfValue(ByteBuffer value)
+    {
+        return value == null ? 4 : 4 + value.remaining();
+    }
+
+    protected static void writeCollectionSize(ByteBuffer output, int elements)
+    {
+        output.putInt(elements);
+    }
+    public static void writeValue(ByteBuffer output, ByteBuffer value)
+    {
+        if (value == null)
+        {
+            output.putInt(-1);
+            return;
+        }
+
+        output.putInt(value.remaining());
+        output.put(value.duplicate());
+    }
+
+    protected static int sizeOfCollectionSize(int elements)
+    {
+        return 4;
+    }
+
+    public static ByteBuffer compose(ByteBuffer... buffers) {
+        if (buffers.length == 1) return buffers[0];
+
+        int totalLength = 0;
+        for (ByteBuffer bb : buffers) totalLength += 2 + bb.remaining() + 1;
+
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer buffer : buffers) {
+            ByteBuffer bb = buffer.duplicate();
+            putShortLength(out, bb.remaining());
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
+    public static void putShortLength(ByteBuffer bb, int length) {
+        bb.put((byte) ((length >> 8) & 0xFF));
+        bb.put((byte) (length & 0xFF));
+    }
+
+}
diff --git a/harry-core/src/harry/visitors/AllPartitionsValidator.java b/harry-core/src/harry/visitors/AllPartitionsValidator.java
index 1b67a8e..7bd5b50 100644
--- a/harry-core/src/harry/visitors/AllPartitionsValidator.java
+++ b/harry-core/src/harry/visitors/AllPartitionsValidator.java
@@ -50,6 +50,10 @@ public class AllPartitionsValidator implements Visitor
     protected final MetricReporter metricReporter;
     protected final ExecutorService executor;
     protected final SystemUnderTest sut;
+
+    protected final AtomicBoolean condition;
+    protected final AtomicLong maxPos;
+
     protected final int concurrency;
     protected final int triggerAfter;
 
@@ -67,20 +71,27 @@ public class AllPartitionsValidator implements Visitor
         this.pdSelector = run.pdSelector;
         this.concurrency = concurrency;
         this.executor = Executors.newFixedThreadPool(concurrency);
+        this.condition = new AtomicBoolean();
+        this.maxPos = new AtomicLong(-1);
+        run.tracker.onLtsStarted((lts) -> {
+            maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
+            if (triggerAfter == 0 || (triggerAfter > 0 && lts % triggerAfter == 0))
+                condition.set(true);
+        });
     }
 
     protected CompletableFuture<Void> validateAllPartitions(ExecutorService executor, int parallelism)
     {
         final long maxPos = this.maxPos.get();
-        AtomicLong counter = new AtomicLong();
-        CompletableFuture[] futures = new CompletableFuture[parallelism];
+        AtomicLong cnt = new AtomicLong();
+        CompletableFuture<?>[] futures = new CompletableFuture[parallelism];
         AtomicBoolean isDone = new AtomicBoolean(false);
 
         for (int i = 0; i < parallelism; i++)
         {
             futures[i] = CompletableFuture.supplyAsync(() -> {
                 long pos;
-                while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted() && !isDone.get())
+                while ((pos = cnt.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted() && !isDone.get())
                 {
                     if (pos > 0 && pos % 100 == 0)
                         logger.info(String.format("Validated %d out of %d partitions", pos, maxPos));
@@ -109,27 +120,30 @@ public class AllPartitionsValidator implements Visitor
         return CompletableFuture.allOf(futures);
     }
 
-    private final AtomicLong maxPos = new AtomicLong(-1);
-
-    public void visit(long lts)
+    public void visit()
     {
-        maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
-
-        if (triggerAfter > 0 && lts % triggerAfter == 0)
+        // TODO: this is ok for now, but if/when we bring exhaustive checker back, we need to change this:
+        // we'll probably want a low-concurrency validator somewhere in background all the time.
+        if (condition.compareAndSet(true, false))
         {
-            logger.info("Starting validations of all {} partitions", maxPos.get());
+            long lts = clock.peek();
+            logger.info("Starting validation of all partitions as of lts {}...", lts);
+
             try
             {
                 validateAllPartitions(executor, concurrency).get();
             }
             catch (Throwable e)
             {
-                throw new RuntimeException(e);
+                // TODO: Make a utility out of this.
+                throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
             }
-            logger.info("Finished validations of all partitions");
+            logger.info("...finished validating all partitions as of lts {}.", lts);
         }
     }
 
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @Override
     public void shutdown() throws InterruptedException
     {
         executor.shutdown();
diff --git a/harry-core/src/harry/visitors/CorruptingVisitor.java b/harry-core/src/harry/visitors/CorruptingVisitor.java
index 26febeb..4677ea2 100644
--- a/harry-core/src/harry/visitors/CorruptingVisitor.java
+++ b/harry-core/src/harry/visitors/CorruptingVisitor.java
@@ -64,8 +64,9 @@ public class CorruptingVisitor implements Visitor
 
     private final AtomicLong maxPos = new AtomicLong(-1);
 
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = run.clock.peek();
         maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
         if (lts == 0 || lts % triggerAfter != 0)
@@ -87,8 +88,4 @@ public class CorruptingVisitor implements Visitor
             logger.error("Caught an exception while trying to corrupt a partition.", t);
         }
     }
-
-    public void shutdown() throws InterruptedException
-    {
-    }
 }
diff --git a/harry-core/src/harry/visitors/DelegatingVisitor.java b/harry-core/src/harry/visitors/DelegatingVisitor.java
deleted file mode 100644
index f4994b1..0000000
--- a/harry-core/src/harry/visitors/DelegatingVisitor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.visitors;
-
-import harry.model.OpSelectors;
-
-public abstract class DelegatingVisitor implements Visitor, VisitExecutor
-{
-    protected VisitExecutor delegate;
-
-    public DelegatingVisitor(VisitExecutor delegate)
-    {
-        this.delegate = delegate;
-    }
-
-    public void beforeLts(long lts, long pd)
-    {
-        delegate.beforeLts(lts, pd);
-    }
-
-    public void afterLts(long lts, long pd)
-    {
-        delegate.afterLts(lts, pd);
-    }
-
-    public void beforeBatch(long lts, long pd, long m)
-    {
-        delegate.beforeBatch(lts, pd, m);
-    }
-
-    public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
-    {
-        delegate.operation(lts, pd, cd, m, opId, opType);
-    }
-
-    public void afterBatch(long lts, long pd, long m)
-    {
-        delegate.afterBatch(lts, pd, m);
-    }
-
-    public void shutdown() throws InterruptedException
-    {
-        delegate.shutdown();
-    }
-}
diff --git a/harry-core/src/harry/visitors/GeneratingVisitor.java b/harry-core/src/harry/visitors/GeneratingVisitor.java
index 78e2b07..624330f 100644
--- a/harry-core/src/harry/visitors/GeneratingVisitor.java
+++ b/harry-core/src/harry/visitors/GeneratingVisitor.java
@@ -22,7 +22,7 @@ import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
 
-public class GeneratingVisitor extends DelegatingVisitor
+public class GeneratingVisitor extends LtsVisitor
 {
     private final OpSelectors.PdSelector pdSelector;
     private final OpSelectors.DescriptorSelector descriptorSelector;
@@ -31,7 +31,8 @@ public class GeneratingVisitor extends DelegatingVisitor
     public GeneratingVisitor(Run run,
                              VisitExecutor delegate)
     {
-        super(delegate);
+        super(delegate, run.clock::nextLts);
+
         this.pdSelector = run.pdSelector;
         this.descriptorSelector = run.descriptorSelector;
         this.schema = run.schemaSpec;
@@ -50,10 +51,10 @@ public class GeneratingVisitor extends DelegatingVisitor
         int modificationsCount = descriptorSelector.numberOfModifications(lts);
         int opsPerModification = descriptorSelector.opsPerModification(lts);
 
-        for (int m = 0; m < modificationsCount; m++)
+        for (long m = 0; m < modificationsCount; m++)
         {
             beforeBatch(lts, pd, m);
-            for (int i = 0; i < opsPerModification; i++)
+            for (long i = 0; i < opsPerModification; i++)
             {
                 long opId = m * opsPerModification + i;
                 long cd = descriptorSelector.cd(pd, lts, opId, schema);
diff --git a/harry-core/src/harry/visitors/LoggingVisitor.java b/harry-core/src/harry/visitors/LoggingVisitor.java
index 8deebde..35f9aa0 100644
--- a/harry-core/src/harry/visitors/LoggingVisitor.java
+++ b/harry-core/src/harry/visitors/LoggingVisitor.java
@@ -31,7 +31,6 @@ import harry.operations.CompiledStatement;
 
 public class LoggingVisitor extends GeneratingVisitor
 {
-
     public LoggingVisitor(Run run,
                           OperationExecutor.RowVisitorFactory rowVisitorFactory)
     {
diff --git a/harry-core/src/harry/visitors/LtsVisitor.java b/harry-core/src/harry/visitors/LtsVisitor.java
new file mode 100644
index 0000000..5b37db0
--- /dev/null
+++ b/harry-core/src/harry/visitors/LtsVisitor.java
@@ -0,0 +1,97 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.visitors;
+
+import java.util.function.LongSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.model.OpSelectors;
+
+/**
+ * Common class for all visitors that support visits at a specific logical timestamp.
+ *
+ * Classes inheriting from LTS Visitor have to visit drawn LTS: every LTS that has been received
+ * from the consumer _has to_ actually be visited. If this is not done, during model checking
+ * drawn LTS will be considered visited, which will lead to data inconsistencies.
+ *
+ * This class and its implementations such as Mutating visitor are NOT thread safe. If you'd like
+ * to have several threads generating data, use multiple copies of delegating visitor, since
+ */
+public abstract class LtsVisitor extends VisitExecutor implements Visitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(LtsVisitor.class);
+    protected final VisitExecutor delegate;
+    private final LongSupplier ltsSource;
+
+    public LtsVisitor(VisitExecutor delegate,
+                      LongSupplier ltsSource)
+    {
+        this.delegate = delegate;
+        this.ltsSource = ltsSource;
+    }
+
+    public final void visit()
+    {
+        long lts = ltsSource.getAsLong();
+        if (lts > 0 && lts % 10_000 == 0)
+            logger.info("Visiting lts {}...", lts);
+        visit(lts);
+    }
+
+    public abstract void visit(long lts);
+
+    @Override
+    protected void beforeLts(long lts, long pd)
+    {
+        delegate.beforeLts(lts, pd);
+    }
+
+    @Override
+    protected void afterLts(long lts, long pd)
+    {
+        delegate.afterLts(lts, pd);
+    }
+
+    @Override
+    protected void beforeBatch(long lts, long pd, long m)
+    {
+        delegate.beforeBatch(lts, pd, m);
+    }
+
+    @Override
+    protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+    {
+        delegate.operation(lts, pd, cd, m, opId, opType);
+    }
+
+    @Override
+    protected void afterBatch(long lts, long pd, long m)
+    {
+        delegate.afterBatch(lts, pd, m);
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException
+    {
+        delegate.shutdown();
+    }
+
+}
diff --git a/harry-core/src/harry/visitors/MutatingVisitor.java b/harry-core/src/harry/visitors/MutatingVisitor.java
index 26c7417..d6eb617 100644
--- a/harry-core/src/harry/visitors/MutatingVisitor.java
+++ b/harry-core/src/harry/visitors/MutatingVisitor.java
@@ -19,6 +19,7 @@
 package harry.visitors;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -41,10 +42,16 @@ public class MutatingVisitor extends GeneratingVisitor
     public MutatingVisitor(Run run,
                            OperationExecutor.RowVisitorFactory rowVisitorFactory)
     {
-        super(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
+        this(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
     }
 
-    public static class MutatingVisitExecutor implements VisitExecutor
+    public MutatingVisitor(Run run,
+                           VisitExecutor visitExecutor)
+    {
+        super(run, visitExecutor);
+    }
+
+    public static class MutatingVisitExecutor extends VisitExecutor
     {
         private final List<String> statements = new ArrayList<>();
         private final List<Object> bindings = new ArrayList<>();
@@ -104,8 +111,7 @@ public class MutatingVisitor extends GeneratingVisitor
             CompiledStatement statement = operationInternal(lts, pd, cd, m, opId, opType);
 
             statements.add(statement.cql());
-            for (Object binding : statement.bindings())
-                bindings.add(binding);
+            Collections.addAll(bindings, statement.bindings());
         }
 
         protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
@@ -140,10 +146,10 @@ public class MutatingVisitor extends GeneratingVisitor
 
         protected void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement)
         {
-            executeAsyncWithRetries(lts, pd, future, statement, 0);
+            executeAsyncWithRetries(future, statement, 0);
         }
 
-        private void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
+        private void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
         {
             if (sut.isShutdown())
                 throw new IllegalStateException("System under test is shut down");
@@ -154,8 +160,10 @@ public class MutatingVisitor extends GeneratingVisitor
             sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
                .whenComplete((res, t) -> {
                    if (t != null)
-                       executor.schedule(() -> executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, TimeUnit.SECONDS);
-                   else
+                   {
+                       logger.error("Caught message while trying to execute " +  statement, t);
+                       executor.schedule(() -> executeAsyncWithRetries(future, statement, retries + 1), 1, TimeUnit.SECONDS);
+                   }else
                        future.complete(res);
                });
         }
diff --git a/harry-core/src/harry/visitors/ParallelValidator.java b/harry-core/src/harry/visitors/ParallelValidator.java
index 4772b40..e96bd39 100644
--- a/harry-core/src/harry/visitors/ParallelValidator.java
+++ b/harry-core/src/harry/visitors/ParallelValidator.java
@@ -86,8 +86,9 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
         }
     }
 
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = run.clock.peek();
         maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
         if (triggerAfter > 0 && lts % triggerAfter == 0)
@@ -104,6 +105,7 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
         }
     }
 
+    @Override
     public void shutdown() throws InterruptedException
     {
         executor.shutdown();
diff --git a/harry-core/src/harry/visitors/RecentValidator.java b/harry-core/src/harry/visitors/RecentValidator.java
index d0d09fa..a563a30 100644
--- a/harry-core/src/harry/visitors/RecentValidator.java
+++ b/harry-core/src/harry/visitors/RecentValidator.java
@@ -24,6 +24,9 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -46,8 +49,12 @@ public class RecentValidator implements Visitor
     private final OpSelectors.PdSelector pdSelector;
     private final QueryGenerator.TypedQueryGenerator querySelector;
     private final MetricReporter metricReporter;
+    private final OpSelectors.MonotonicClock clock;
+
+    private final AtomicBoolean condition;
+    private final AtomicLong maxPos;
+
     private final int partitionCount;
-    private final int triggerAfter;
     private final int queries;
 
     public RecentValidator(int partitionCount,
@@ -57,11 +64,19 @@ public class RecentValidator implements Visitor
                            Model.ModelFactory modelFactory)
     {
         this.partitionCount = partitionCount;
-        this.triggerAfter = triggerAfter;
         this.queries = queries;
         this.metricReporter = run.metricReporter;
         this.pdSelector = run.pdSelector;
+        this.clock = run.clock;
 
+        this.condition = new AtomicBoolean();
+        this.maxPos = new AtomicLong(-1);
+
+        run.tracker.onLtsStarted((lts) -> {
+            maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
+            if (triggerAfter == 0 || (triggerAfter > 0 && lts % triggerAfter == 0))
+                condition.set(true);
+        });
         this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
                                                                     // TODO: make query kind configurable
                                                                     Surjections.enumValues(Query.QueryKind.class),
@@ -78,10 +93,8 @@ public class RecentValidator implements Visitor
         }
     }
 
-    private final AtomicLong maxPos = new AtomicLong(-1);
-
     // TODO: expose metric, how many times validated recent partitions
-    private void validateRecentPartitions(long lts)
+    private int validateRecentPartitions()
     {
         long pos = maxPos.get();
 
@@ -94,35 +107,34 @@ public class RecentValidator implements Visitor
                 metricReporter.validateRandomQuery();
                 Query query = querySelector.inflate(visitLts, i);
                 // TODO: add pd skipping from shrinker here, too
-                log(lts, i, query);
+                log(i, query);
                 model.validate(query);
             }
 
             pos--;
             maxPartitions--;
         }
+        
+        return partitionCount - maxPartitions;
     }
 
-    public void visit(long lts)
+    @Override
+    public void visit()
     {
-        maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
-
-        if (triggerAfter > 0 && lts % triggerAfter == 0)
+        if (condition.compareAndSet(true, false))
         {
-            logger.info("Validating {} recent partitions", partitionCount);
-            validateRecentPartitions(lts);
+            long lts = clock.peek();
+            logger.info("Validating (up to) {} recent partitions as of lts {}...", partitionCount, lts);
+            int count = validateRecentPartitions();
+            logger.info("...finished validating {} recent partitions as of lts {}.", count, lts);
         }
     }
 
-    public void shutdown() throws InterruptedException
-    {
-    }
-
-    private void log(long lts, int modifier, Query query)
+    private void log(int modifier, Query query)
     {
         try
         {
-            validationLog.write("LTS: " + lts + ". Modifier: " + modifier + ". PD: " + query.pd);
+            validationLog.write(String.format("PD: %d. Modifier: %d.", query.pd, modifier));
             validationLog.write("\t");
             validationLog.write(query.toSelectStatement().toString());
             validationLog.write("\n");
diff --git a/harry-core/src/harry/visitors/ReplayingVisitor.java b/harry-core/src/harry/visitors/ReplayingVisitor.java
index 1979664..bd3e1c2 100644
--- a/harry-core/src/harry/visitors/ReplayingVisitor.java
+++ b/harry-core/src/harry/visitors/ReplayingVisitor.java
@@ -19,17 +19,19 @@
 package harry.visitors;
 
 import java.util.Arrays;
+import java.util.function.LongSupplier;
 
 import harry.core.Run;
 import harry.model.OpSelectors;
 
-public abstract class ReplayingVisitor extends DelegatingVisitor
+public abstract class ReplayingVisitor extends LtsVisitor
 {
-    public ReplayingVisitor(VisitExecutor delegate)
+    public ReplayingVisitor(VisitExecutor delegate, LongSupplier ltsSource)
     {
-        super(delegate);
+        super(delegate, ltsSource);
     }
 
+    @Override
     public void visit(long lts)
     {
         replay(getVisit(lts));
@@ -37,7 +39,8 @@ public abstract class ReplayingVisitor extends DelegatingVisitor
 
     public abstract Visit getVisit(long lts);
 
-    public abstract void replayAll(Run run);
+    public abstract void replayAll();
+
     private void replay(Visit visit)
     {
         beforeLts(visit.lts, visit.pd);
diff --git a/harry-core/src/harry/visitors/Sampler.java b/harry-core/src/harry/visitors/Sampler.java
index 20c9a8c..fe73089 100644
--- a/harry-core/src/harry/visitors/Sampler.java
+++ b/harry-core/src/harry/visitors/Sampler.java
@@ -43,6 +43,7 @@ public class Sampler implements Visitor
     // TODO: move maxPos to data tracker since we seem to use quite a lot?
     private final AtomicLong maxPos = new AtomicLong(-1);
     private final OpSelectors.PdSelector pdSelector;
+    private final OpSelectors.MonotonicClock clock;
     private final SchemaSpec schema;
     private final int triggerAfter;
     private final int samplePartitions;
@@ -52,16 +53,18 @@ public class Sampler implements Visitor
     {
         this.sut = run.sut;
         this.pdSelector = run.pdSelector;
+        this.clock = run.clock;
         this.schema = run.schemaSpec;
         this.triggerAfter = triggerAfter;
         this.samplePartitions = samplePartitions;
     }
 
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = clock.peek();
         maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
 
-        if (triggerAfter > 0 && lts % triggerAfter == 0)
+        if (triggerAfter == 0 || triggerAfter > 0 && lts % triggerAfter == 0)
         {
             long max = maxPos.get();
             DescriptiveStatistics ds = new DescriptiveStatistics();
@@ -84,10 +87,6 @@ public class Sampler implements Visitor
         }
     }
 
-    public void shutdown() throws InterruptedException
-    {
-    }
-
     @JsonTypeName("sampler")
     public static class SamplerConfiguration implements Configuration.VisitorConfiguration
     {
diff --git a/harry-core/src/harry/visitors/SingleValidator.java b/harry-core/src/harry/visitors/SingleValidator.java
index 0993461..85a378a 100644
--- a/harry-core/src/harry/visitors/SingleValidator.java
+++ b/harry-core/src/harry/visitors/SingleValidator.java
@@ -29,6 +29,7 @@ public class SingleValidator implements Visitor
     protected final Model model;
     protected final QueryGenerator queryGenerator;
     protected final Run run;
+
     public SingleValidator(int iterations,
                            Run run,
                            Model.ModelFactory modelFactory)
@@ -39,9 +40,10 @@ public class SingleValidator implements Visitor
         this.run = run;
     }
 
-    public void shutdown() throws InterruptedException
+    @Override
+    public void visit()
     {
-
+        visit(run.clock.peek());
     }
 
     public void visit(long lts)
diff --git a/harry-core/src/harry/visitors/VisitExecutor.java b/harry-core/src/harry/visitors/VisitExecutor.java
index 94aa1fc..63efba0 100644
--- a/harry-core/src/harry/visitors/VisitExecutor.java
+++ b/harry-core/src/harry/visitors/VisitExecutor.java
@@ -20,17 +20,17 @@ package harry.visitors;
 
 import harry.model.OpSelectors;
 
-public interface VisitExecutor
+public abstract class VisitExecutor
 {
-    public void beforeLts(long lts, long pd);
+    protected abstract void beforeLts(long lts, long pd);
 
-    public void afterLts(long lts, long pd);
+    protected abstract void afterLts(long lts, long pd);
 
-    public void beforeBatch(long lts, long pd, long m);
+    protected abstract void beforeBatch(long lts, long pd, long m);
 
-    public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
+    protected abstract void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
 
-    public void afterBatch(long lts, long pd, long m);
+    protected abstract void afterBatch(long lts, long pd, long m);
 
-    public default void shutdown() throws InterruptedException {}
+    public abstract void shutdown() throws InterruptedException;
 }
diff --git a/harry-core/src/harry/visitors/Visitor.java b/harry-core/src/harry/visitors/Visitor.java
index 7cb6111..b8baf10 100644
--- a/harry-core/src/harry/visitors/Visitor.java
+++ b/harry-core/src/harry/visitors/Visitor.java
@@ -22,12 +22,12 @@ import harry.core.Run;
 
 public interface Visitor
 {
-    void visit(long lts);
+    void visit();
 
-    public default void shutdown() throws InterruptedException {}
+    default void shutdown() throws InterruptedException {}
 
-    public interface VisitorFactory
+    interface VisitorFactory
     {
-        public Visitor make(Run run);
+        Visitor make(Run run);
     }
 }
\ No newline at end of file
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 0f474f0..e2ebd82 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -44,8 +44,8 @@ import harry.model.clock.OffsetClock;
 import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.runner.DataTracker;
+import harry.visitors.LtsVisitor;
 import harry.visitors.MutatingVisitor;
-import harry.visitors.Visitor;
 import harry.visitors.OperationExecutor;
 import harry.util.BitSet;
 
@@ -205,82 +205,80 @@ public class OpSelectorsTest
         };
 
         Run run = new Run(rng,
-                new OffsetClock(0),
-                pdSelector,
-                ckSelector,
-                schema,
-                DataTracker.NO_OP,
-                SystemUnderTest.NO_OP,
-                MetricReporter.NO_OP);
-
-        Visitor visitor = new MutatingVisitor(run,
-                                              (r) -> new OperationExecutor()
-                                                                         {
-                                                                             public CompiledStatement insert(long lts, long pd, long cd, long m)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement update(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deletePartition(long lts, long pd, long opId)
-                                                                             {
-                                                                                 // ignore
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteRange(long lts, long pd, long opId)
-                                                                             {
-                                                                                 // ignore
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteSlice(long lts, long pd, long opId)
-                                                                             {
-                                                                                 // ignore
-                                                                                 return compiledStatement;
-                                                                             }
-                                                                         });
+                          new OffsetClock(0),
+                          pdSelector,
+                          ckSelector,
+                          schema,
+                          DataTracker.NO_OP,
+                          SystemUnderTest.NO_OP,
+                          MetricReporter.NO_OP);
+
+        LtsVisitor visitor = new MutatingVisitor(run,
+                                                 (r) -> new OperationExecutor()
+                                                        {
+                                                            public CompiledStatement insert(long lts, long pd, long cd, long m)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement update(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deletePartition(long lts, long pd, long opId)
+                                                            {
+                                                                // ignore
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteRange(long lts, long pd, long opId)
+                                                            {
+                                                                // ignore
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteSlice(long lts, long pd, long opId)
+                                                            {
+                                                                // ignore
+                                                                return compiledStatement;
+                                                            }
+                                                        });
 
         for (int lts = 0; lts < 1000; lts++)
-        {
-            visitor.visit(lts);
-        }
+            visitor.visit();
 
         for (Collection<Long> value : partitionMap.values())
             Assert.assertEquals(10, value.size());
diff --git a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
index 702163e..621d579 100644
--- a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
+++ b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
@@ -38,7 +38,7 @@ public class HarryRunnerExternal extends HarryRunner {
     }
 
     @Override
-    public void beforeRun(Runner runner) {
+    public void beforeRun(Runner.TimedRunner runner) {
 
     }
 }
diff --git a/harry-integration/src/harry/model/sut/ByteUtils.java b/harry-integration/src/harry/model/sut/ByteUtils.java
new file mode 100644
index 0000000..ee7721c
--- /dev/null
+++ b/harry-integration/src/harry/model/sut/ByteUtils.java
@@ -0,0 +1,115 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model.sut;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.DoubleType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ShortType;
+import org.apache.cassandra.db.marshal.UUIDType;
+
+public class ByteUtils extends harry.util.ByteUtils
+{
+    public static ByteBuffer objectToBytes(Object obj)
+    {
+        if (obj instanceof Integer)
+            return bytes((int) obj);
+        else if (obj instanceof Byte)
+            return bytes((byte) obj);
+        else if (obj instanceof Boolean)
+            return bytes( (byte) (((boolean) obj) ? 1 : 0));
+        else if (obj instanceof Short)
+            return bytes((short) obj);
+        else if (obj instanceof Long)
+            return bytes((long) obj);
+        else if (obj instanceof Float)
+            return bytes((float) obj);
+        else if (obj instanceof Double)
+            return bytes((double) obj);
+        else if (obj instanceof UUID)
+            return bytes((UUID) obj);
+        else if (obj instanceof InetAddress)
+            return bytes((InetAddress) obj);
+        else if (obj instanceof String)
+            return bytes((String) obj);
+        else if (obj instanceof List)
+        {
+            List l = (List) obj;
+            if (l.isEmpty())
+                return pack(Collections.emptyList(), 0);
+            return ListType.getInstance(getType(l.get(0)), true).decompose(l);
+        }
+        else if (obj instanceof Set)
+        {
+            Set l = (Set) obj;
+            if (l.isEmpty())
+                return pack(Collections.emptyList(), 0);
+            return SetType.getInstance(getType(l.iterator().next()), true).decompose(l);
+        }
+        else if (obj instanceof ByteBuffer)
+            return (ByteBuffer) obj;
+
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+
+    public static AbstractType<?> getType(Object obj)
+    {
+        if (obj instanceof Integer)
+            return IntegerType.instance; //TODO
+        else if (obj instanceof Byte)
+            return ByteType.instance;
+        else if (obj instanceof Boolean)
+            return BooleanType.instance;
+        else if (obj instanceof Short)
+            return ShortType.instance;
+        else if (obj instanceof Long)
+            return IntegerType.instance;
+        else if (obj instanceof Float)
+            return FloatType.instance;
+        else if (obj instanceof Double)
+            return DoubleType.instance;
+        else if (obj instanceof UUID)
+            return UUIDType.instance;
+        else if (obj instanceof InetAddress)
+            return InetAddressType.instance;
+        else if (obj instanceof String)
+            return AsciiType.instance;
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+}
diff --git a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
index ff4ad86..6bc0705 100644
--- a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
+++ b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import harry.core.Configuration;
 import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.operations.CompiledStatement;
@@ -38,7 +37,7 @@ public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorE
 {
     public static void init()
     {
-        Configuration.registerSubtypes(Configuation.class);
+        harry.core.Configuration.registerSubtypes(Configuration.class);
     }
 
     private final InJvmSut sut;
@@ -92,14 +91,14 @@ public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorE
     }
 
     @JsonTypeName("in_jvm_token_aware")
-    public static class Configuation implements Configuration.VisitorConfiguration
+    public static class Configuration implements harry.core.Configuration.VisitorConfiguration
     {
-        public final Configuration.RowVisitorConfiguration row_visitor;
+        public final harry.core.Configuration.RowVisitorConfiguration row_visitor;
         public final SystemUnderTest.ConsistencyLevel consistency_level;
 
         @JsonCreator
-        public Configuation(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration rowVisitor,
-                            @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
+        public Configuration(@JsonProperty("row_visitor") harry.core.Configuration.RowVisitorConfiguration rowVisitor,
+                             @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
         {
             this.row_visitor = rowVisitor;
             this.consistency_level = consistencyLevel;
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index f758ca5..687a5b3 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -20,27 +20,19 @@ package harry.model.sut;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import harry.core.Configuration;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
 {
@@ -49,8 +41,6 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
         Configuration.registerSubtypes(InJvmSutConfiguration.class);
     }
 
-    private static final Logger logger = LoggerFactory.getLogger(InJvmSut.class);
-
     public InJvmSut(Cluster cluster)
     {
         super(cluster, 10);
@@ -97,7 +87,7 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
     {
         return cluster.get(1).appliesOnInstance((Object[] pk, String ks) ->
                                                 {
-                                                    String pkString = Arrays.asList(pk).stream().map(Object::toString).collect(Collectors.joining(":"));
+                                                    String pkString = Arrays.stream(pk).map(Object::toString).collect(Collectors.joining(":"));
                                                     EndpointsForToken endpoints = StorageService.instance.getNaturalReplicasForToken(ks, table, pkString);
                                                     int[] nodes = new int[endpoints.size()];
                                                     for (int i = 0; i < endpoints.size(); i++)
@@ -105,4 +95,4 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
                                                     return nodes;
                                                 }).apply(partitionKey, keyspace);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/src/harry/runner/HarryRunnerJvm.java b/harry-integration/src/harry/runner/HarryRunnerJvm.java
index fbb30b9..ed4ea46 100644
--- a/harry-integration/src/harry/runner/HarryRunnerJvm.java
+++ b/harry-integration/src/harry/runner/HarryRunnerJvm.java
@@ -37,7 +37,7 @@ public class HarryRunnerJvm extends HarryRunner {
 
 
     @Override
-    public void beforeRun(Runner runner) {
+    public void beforeRun(Runner.TimedRunner runner) {
 
     }
 }
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
index fe811e5..71c1535 100644
--- a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -19,7 +19,6 @@
 package harry.runner;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,6 +28,7 @@ import harry.core.Configuration;
 import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.model.Model;
+import harry.model.OpSelectors;
 import harry.model.QuiescentChecker;
 import harry.model.sut.InJvmSut;
 import harry.model.sut.SystemUnderTest;
@@ -47,27 +47,26 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
                                        QuiescentCheckerConfig.class);
     }
 
-    public final InJvmSut inJvmSut;
-
+    private final InJvmSut inJvmSut;
+    private final OpSelectors.MonotonicClock clock;
     public RepairingLocalStateValidator(int concurrency, int triggerAfter, Run run, Model.ModelFactory modelFactory)
     {
         super(concurrency, triggerAfter, run, modelFactory);
-
         this.inJvmSut = (InJvmSut) run.sut;
+        this.clock = run.clock;
     }
 
-    @Override
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = clock.peek();
         if (lts > 0 && lts % triggerAfter == 0)
         {
             System.out.println("Starting repair...");
 
-            inJvmSut.cluster().stream().forEach((instance) -> {
-                instance.nodetool("repair", "--full");
-            });
+            inJvmSut.cluster().stream().forEach((instance) -> instance.nodetool("repair", "--full"));
+
             System.out.println("Validating partitions...");
-            super.visit(lts);
+            super.visit();
         }
     }
 
@@ -110,7 +109,7 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
         public void validate(Query query)
         {
             CompiledStatement compiled = query.toSelectStatement();
-            int[] replicas = inJvmSut.getReplicasFor(schemaSpec.inflatePartitionKey(query.pd), schemaSpec.keyspace, schemaSpec.table);
+            int[] replicas = inJvmSut.getReplicasFor(schema.inflatePartitionKey(query.pd), schema.keyspace, schema.table);
             for (int node : replicas)
             {
                 validate(() -> {
diff --git a/harry-integration/src/harry/runner/TrivialShrinker.java b/harry-integration/src/harry/runner/TrivialShrinker.java
index 2879e0e..b598b85 100644
--- a/harry-integration/src/harry/runner/TrivialShrinker.java
+++ b/harry-integration/src/harry/runner/TrivialShrinker.java
@@ -24,13 +24,14 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.visitors.DelegatingVisitor;
-import harry.visitors.Visitor;
+import harry.visitors.LtsVisitor;
 import harry.visitors.SkippingVisitor;
+import harry.visitors.Visitor;
 
 /**
  * A most trivial imaginable shrinker: attempts to skip partitions and/or logical timestamps to see if the
@@ -63,15 +64,17 @@ public class TrivialShrinker
             Run run = configuration.createRun();
             Configuration.SequentialRunnerConfig config = (Configuration.SequentialRunnerConfig) configuration.runner;
             List<Visitor> visitors = new ArrayList<>();
-            for (Configuration.VisitorConfiguration factory : config.visitor_factories)
+            for (Configuration.VisitorConfiguration factory : config.visitorFactories)
             {
                 Visitor visitor = factory.make(run);
-                if (visitor instanceof DelegatingVisitor)
+                if (visitor instanceof LtsVisitor)
                 {
-                    visitors.add(new SkippingVisitor(visitor,
+                    AtomicLong counter = new AtomicLong();
+                    visitors.add(new SkippingVisitor((LtsVisitor) visitor,
+                                                     counter::getAndIncrement,
                                                      (lts) -> run.pdSelector.pd(lts, run.schemaSpec),
                                                      ltsToSkip,
-                                                     pdsToSkip));
+                                                     pdsToSkip)) ;
                 }
                 else
                 {
@@ -93,7 +96,7 @@ public class TrivialShrinker
 
                 try
                 {
-                    runOnce(run, visitors, maxLts);
+                    runOnce(visitors, maxLts);
                     System.out.println("Can not skip " + pdToCheck + "\nCan only skip these: " + toString(pdsToSkip));
                     pdsToSkip.remove(pdToCheck);
                 }
@@ -126,7 +129,7 @@ public class TrivialShrinker
 
                 try
                 {
-                    runOnce(run, visitors, maxLts);
+                    runOnce(visitors, maxLts);
                     System.out.println("Can not skip " + ltsToCheck + "\nCan only skip these: " + toString(ltsToSkip));
                     ltsToSkip.remove(ltsToCheck);
                 }
@@ -160,13 +163,13 @@ public class TrivialShrinker
         }
     }
 
-    public static void runOnce(Run run, List<Visitor> visitors, long maxLts)
+    public static void runOnce(List<Visitor> visitors, long maxLts)
     {
         for (long lts = 0; lts <= maxLts; lts++)
         {
             for (Visitor visitor : visitors)
             {
-                visitor.visit(lts);
+                visitor.visit();
             }
         }
     }
@@ -183,4 +186,4 @@ public class TrivialShrinker
         }
         return s.substring(0, s.length() - 1);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/src/harry/visitors/SkippingVisitor.java b/harry-integration/src/harry/visitors/SkippingVisitor.java
index 8da0a89..67c6488 100644
--- a/harry-integration/src/harry/visitors/SkippingVisitor.java
+++ b/harry-integration/src/harry/visitors/SkippingVisitor.java
@@ -19,20 +19,23 @@
 package harry.visitors;
 
 import java.util.Set;
+import java.util.function.LongSupplier;
 
-public class SkippingVisitor implements Visitor
+public class SkippingVisitor extends LtsVisitor
 {
     private final Set<Long> ltsToSkip;
     private final Set<Long> pdsToSkip;
     private final LtsToPd ltsToPd;
-    private final Visitor delegate;
+    // Use DelegatingVisitor class instead of VisitExecutor available via protected field
+    private LtsVisitor delegateShadow;
 
-    public SkippingVisitor(Visitor delegate,
+    public SkippingVisitor(LtsVisitor delegate,
+                           LongSupplier ltsSupplier,
                            LtsToPd ltsToPd,
                            Set<Long> ltsToSkip,
                            Set<Long> pdsToSkip)
     {
-        this.delegate = delegate;
+        super(delegate, ltsSupplier);
         this.ltsToSkip = ltsToSkip;
         this.pdsToSkip = pdsToSkip;
         this.ltsToPd = ltsToPd;
@@ -43,7 +46,7 @@ public class SkippingVisitor implements Visitor
         if (ltsToSkip.contains(lts) || pdsToSkip.contains(ltsToPd.convert(lts)))
             return;
 
-        delegate.visit(lts);
+        delegateShadow.visit(lts);
     }
 
     public static interface LtsToPd
diff --git a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
index 33463b8..0da7743 100644
--- a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
+++ b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
@@ -16,9 +16,9 @@ import harry.model.OpSelectors;
 import harry.model.sut.SystemUnderTest;
 import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.visitors.SingleValidator;
 import harry.util.TestRunner;
+import harry.visitors.Visitor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.distributed.impl.RowUtil;
@@ -94,12 +94,12 @@ public class DataGeneratorsIntegrationTest extends CQLTester
 
                                 Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
                                 for (int lts = 0; lts < 100; lts++)
-                                    visitor.visit(lts);
+                                    visitor.visit();
                             }
 
                             Run run = builder.build()
                                              .createRun();
-                            Visitor visitor = new SingleValidator(100, run, NoOpChecker::new);
+                            SingleValidator visitor = new SingleValidator(100, run, NoOpChecker::new);
                             for (int lts = 0; lts < 100; lts++)
                                 visitor.visit(lts);
 
diff --git a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
index 6c0ee24..31cd9f1 100644
--- a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.function.Supplier;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import harry.core.Configuration;
@@ -65,6 +66,9 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
             HistoryBuilder history = new HistoryBuilder(run);
 
             Set<Long> pds = new HashSet<>();
+            run.tracker.onLtsStarted((long lts) -> {
+                pds.add(run.pdSelector.pd(lts, run.schemaSpec));
+            });
 
             for (int j = 0; j < 5; j++)
             {
@@ -97,28 +101,19 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
                            .partitionDelete()
                            .finish();
 
-                ReplayingVisitor visitor = history.visitor(new MutatingVisitor.MutatingVisitExecutor(run,
-                                                                                                     new MutatingRowVisitor(run)
-                                                                                                     {
-                                                                                                         public CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
-                                                                                                         {
-                                                                                                             pds.add(pd);
-                                                                                                             return super.perform(op, lts, pd, cd, opId);
-                                                                                                         }
-                                                                                                     }));
+                ReplayingVisitor visitor = history.visitor(run);
 
-                visitor.replayAll(run);
+                visitor.replayAll();
 
                 Model model = new QuiescentChecker(run, new Reconciler(run,
                                                                        history::visitor));
                 QueryGenerator.TypedQueryGenerator queryGenerator = new QueryGenerator.TypedQueryGenerator(run);
+                Assert.assertFalse(pds.isEmpty());
                 for (Long pd : pds)
                 {
-                    model.validate(Query.selectPartition(run.schemaSpec,
-                                                         pd,
-                                                         false));
+                    model.validate(Query.selectPartition(run.schemaSpec, pd,false));
 
-                    int lts = new Random().nextInt((int) run.clock.maxLts());
+                    int lts = new Random().nextInt((int) run.clock.peek());
                     for (int k = 0; k < 3; k++)
                         queryGenerator.inflate(lts, k);
                 }
@@ -169,7 +164,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
                                                                                                                     }
                                                                                                                 }));
 
-                                visitor.replayAll(run);
+                                visitor.replayAll();
 
                                 Model model = new QuiescentChecker(run, new Reconciler(run,
                                                                                        sut::visitor));
@@ -182,7 +177,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
                                     model.validate(Query.selectPartition(run.schemaSpec,
                                                                          pd,
                                                                          true));
-                                    int lts = new Random().nextInt((int) run.clock.maxLts());
+                                    int lts = new Random().nextInt((int) run.clock.peek());
                                     for (int k = 0; k < 3; k++)
                                         queryGenerator.inflate(lts, k);
                                 }
diff --git a/harry-integration/test/harry/model/HistoryBuilderTest.java b/harry-integration/test/harry/model/HistoryBuilderTest.java
index 5ccca60..e7f3413 100644
--- a/harry-integration/test/harry/model/HistoryBuilderTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderTest.java
@@ -129,8 +129,9 @@ public class HistoryBuilderTest
                                 public void afterLts(long lts, long pd){}
                                 public void beforeBatch(long lts, long pd, long m){}
                                 public void afterBatch(long lts, long pd, long m){}
+                                public void shutdown() {}
                             });
-                            visitor.replayAll(run);
+                            visitor.replayAll();
                             for (Counts counts : model)
                                 Assert.assertTrue(counts.toString(), counts.allDone());
                         });
diff --git a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
index 0f28005..c84c789 100644
--- a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
+++ b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
@@ -32,7 +32,7 @@ import harry.model.sut.InJVMTokenAwareVisitExecutor;
 import harry.model.sut.InJvmSut;
 import harry.model.sut.SystemUnderTest;
 import harry.runner.RepairingLocalStateValidator;
-import harry.visitors.GeneratingVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.Visitor;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
@@ -70,21 +70,20 @@ public class InJVMTokenAwareExecutorTest extends IntegrationTestBase
             Run run = configuration.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
 
-            Visitor visitor = new GeneratingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
-                                                                                          new Configuration.MutatingRowVisitorConfiguration(),
-                                                                                          SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
+            Visitor visitor = new MutatingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
+                                                                                        new Configuration.MutatingRowVisitorConfiguration(),
+                                                                                        SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
 
             OpSelectors.MonotonicClock clock = run.clock;
             long maxPd = 0;
             for (int i = 0; i < 10000; i++)
             {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-                maxPd = Math.max(maxPd, run.pdSelector.positionFor(lts));
+                visitor.visit();
+                maxPd = Math.max(maxPd, run.pdSelector.positionFor(clock.peek()));
             }
 
             RepairingLocalStateValidator validator = new RepairingLocalStateValidator(5, 1, run, new Configuration.QuiescentCheckerConfig());
-            validator.visit(clock.maxLts());
+            validator.visit();
         }
 
     }
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index a6a9697..6d8b4ef 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -95,9 +95,9 @@ public class IntegrationTestBase extends TestBaseImpl
                                                        .setCreateSchema(true)
                                                        .setTruncateTable(false)
                                                        .setDropSchema(true)
-                                                       .setSchemaProvider(seed1 -> schema)
+                                                       .setSchemaProvider((seed1, sut) -> schema)
                                                        .setClusteringDescriptorSelector(sharedCDSelectorConfiguration().build())
                                                        .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, 200))
                                                        .setSUT(() -> sut);
     }
-}
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
index 576a291..3381137 100644
--- a/harry-integration/test/harry/model/ModelTestBase.java
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -18,7 +18,6 @@
 
 package harry.model;
 
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -30,9 +29,9 @@ import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.visitors.LoggingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.runner.Runner;
 import harry.visitors.SingleValidator;
+import harry.visitors.Visitor;
 
 public abstract class ModelTestBase extends IntegrationTestBase
 {
@@ -46,33 +45,30 @@ public abstract class ModelTestBase extends IntegrationTestBase
         }
     }
 
-    void negativeIntegrationTest(Model.ModelFactory factory) throws Throwable
+    void negativeIntegrationTest(Configuration.RunnerConfiguration runnerConfig) throws Throwable
     {
         Supplier<SchemaSpec> supplier = SchemaGenerators.progression(1);
         for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
         {
             SchemaSpec schema = supplier.get();
             Configuration.ConfigurationBuilder builder = configuration(i, schema);
+
             builder.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(10),
                                                                                       1, TimeUnit.SECONDS))
-                   .setRunTime(1, TimeUnit.MINUTES)
                    .setCreateSchema(false)
                    .setDropSchema(false)
-                   .setRunner(new Configuration.SequentialRunnerConfig(Arrays.asList(new Configuration.LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
-                                                                                     new Configuration.RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
-                                                                                     new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))));
-            Runner runner = builder.build().createRunner();
+                   .setRunner(runnerConfig);
+
+            Configuration config = builder.build();
+            Runner runner = config.createRunner();
+            
             try
             {
                 Run run = runner.getRun();
                 beforeEach();
                 run.sut.schemaChange(run.schemaSpec.compile().cql());
 
-                runner.initAndStartAll().get(2, TimeUnit.MINUTES);
-            }
-            catch (Throwable t)
-            {
-                throw t;
+                runner.initAndStartAll().get(4, TimeUnit.MINUTES);
             }
             finally
             {
@@ -83,7 +79,7 @@ public abstract class ModelTestBase extends IntegrationTestBase
 
     abstract Configuration.ModelConfiguration modelConfiguration();
 
-    protected Visitor validator(Run run)
+    protected SingleValidator validator(Run run)
     {
         return new SingleValidator(100, run , modelConfiguration());
     }
@@ -101,17 +97,13 @@ public abstract class ModelTestBase extends IntegrationTestBase
         beforeEach();
         run.sut.schemaChange(run.schemaSpec.compile().cql());
         System.out.println(run.schemaSpec.compile().cql());
-        OpSelectors.MonotonicClock clock = run.clock;
 
-        Visitor validator = validator(run);
         Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
 
         for (int i = 0; i < 20000; i++)
-        {
-            long lts = clock.nextLts();
-            visitor.visit(lts);
-        }
+            visitor.visit();
 
+        SingleValidator validator = validator(run);
         validator.visit(0);
 
         if (!corrupt.apply(run))
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 92498dd..13ae548 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -41,9 +41,9 @@ import harry.corruptor.ShowValueCorruptor;
 import harry.ddl.SchemaGenerators;
 import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
+import harry.visitors.Visitor;
 
 import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 
@@ -117,10 +117,7 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
             QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
 
             for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-            }
+                visitor.visit();
 
             while (true)
             {
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 6d8dc5c..099adf1 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -33,9 +33,9 @@ import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
+import harry.visitors.Visitor;
 
 import static harry.generators.DataGenerators.NIL_DESCR;
 
@@ -70,15 +70,11 @@ public class QuerySelectorTest extends IntegrationTestBase
 
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
-            OpSelectors.MonotonicClock clock = run.clock;
 
             Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-            }
+                visitor.visit();
 
             QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
 
@@ -145,14 +141,10 @@ public class QuerySelectorTest extends IntegrationTestBase
                                    .build();
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
-            OpSelectors.MonotonicClock clock = run.clock;
             Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-            }
+                visitor.visit();
 
             QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
             Model model = new QuiescentChecker(run);
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index 945dce2..97a554a 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -18,9 +18,19 @@
 
 package harry.model;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Test;
 
 import harry.core.Configuration;
+import harry.core.Configuration.AllPartitionsValidatorConfiguration;
+import harry.core.Configuration.ConcurrentRunnerConfig;
+import harry.core.Configuration.LoggingVisitorConfiguration;
+import harry.core.Configuration.RecentPartitionsValidatorConfiguration;
+import harry.core.Configuration.SequentialRunnerConfig;
+import harry.core.Configuration.SingleVisitRunnerConfig;
 import harry.core.Run;
 import harry.corruptor.AddExtraRowCorruptor;
 import harry.corruptor.ChangeValueCorruptor;
@@ -29,14 +39,14 @@ import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 import harry.ddl.SchemaSpec;
-import harry.visitors.Visitor;
 import harry.operations.Query;
+import harry.runner.StagedRunner.StagedRunnerConfig;
 import harry.visitors.SingleValidator;
 
 public class QuiescentCheckerIntegrationTest extends ModelTestBase
 {
     @Override
-    protected Visitor validator(Run run)
+    protected SingleValidator validator(Run run)
     {
         return new SingleValidator(100, run, modelConfiguration());
     }
@@ -56,7 +66,30 @@ public class QuiescentCheckerIntegrationTest extends ModelTestBase
     @Test
     public void normalConditionIntegrationTest() throws Throwable
     {
-        negativeIntegrationTest(modelConfiguration()::make);
+        Model.ModelFactory factory = modelConfiguration();
+                
+        SequentialRunnerConfig sequential = 
+                new SequentialRunnerConfig(Arrays.asList(new LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+                                                         new RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
+                                                         new AllPartitionsValidatorConfiguration(10, 10, factory::make)),
+                                           1, TimeUnit.MINUTES);
+        negativeIntegrationTest(sequential);
+    }
+
+    @Test
+    public void normalConditionStagedIntegrationTest() throws Throwable
+    {
+        Model.ModelFactory factory = modelConfiguration();
+
+        ConcurrentRunnerConfig concurrent = 
+                new ConcurrentRunnerConfig(4, Collections.singletonList(new LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration())),
+                                           30, TimeUnit.SECONDS);
+        SingleVisitRunnerConfig sequential = 
+                new SingleVisitRunnerConfig(Collections.singletonList(new RecentPartitionsValidatorConfiguration(1024, 0, 1, factory::make)));
+        
+        StagedRunnerConfig staged = new StagedRunnerConfig(Arrays.asList(concurrent, sequential), 2, TimeUnit.MINUTES);
+
+        negativeIntegrationTest(staged);
     }
 
     @Test
diff --git a/harry-integration/test/resources/single_partition_test.yml b/harry-integration/test/resources/single_partition_test.yml
index 8ec4c4e..3f2017f 100644
--- a/harry-integration/test/resources/single_partition_test.yml
+++ b/harry-integration/test/resources/single_partition_test.yml
@@ -12,9 +12,6 @@ clock:
   offset:
     offset: 1000
 
-run_time: 10
-run_time_unit: "MINUTES"
-
 system_under_test:
   println: {}
 
@@ -49,6 +46,8 @@ data_tracker:
 
 runner:
   sequential:
+    run_time: 10
+    run_time_unit: "MINUTES"
     visitors: []
 
 metric_reporter:

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org