You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2022/01/10 10:46:13 UTC

[cassandra-harry] branch trunk updated (5aea1cf -> 275f188)

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git.


 discard 5aea1cf  Features:   * Implement lookbehind via tracker callbacks   * Improve DSL   * Rename maxLts to peek   * Split lts visitors from visitors   * Allow create table if not exists   * Allow sampler to be triggered at every LTS   * Allow local state validator to always run   * Add Staged Runner   * Add wait for token ranges   * Make keyspace DDL configurable   * Rename PartitionVisitor to Visitor
     new 275f188  Features:   * Implement lookbehind via tracker callbacks   * Improve DSL   * Rename maxLts to peek   * Split lts visitors from visitors   * Allow create table if not exists   * Allow sampler to be triggered at every LTS   * Allow local state validator to always run   * Add Staged Runner   * Add wait for token ranges   * Make keyspace DDL configurable   * Rename PartitionVisitor to Visitor

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5aea1cf)
            \
             N -- N -- N   refs/heads/trunk (275f188)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../test/harry/model/HistoryBuilderIntegrationTest.java             | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra-harry] 01/01: Features: * Implement lookbehind via tracker callbacks * Improve DSL * Rename maxLts to peek * Split lts visitors from visitors * Allow create table if not exists * Allow sampler to be triggered at every LTS * Allow local state validator to always run * Add Staged Runner * Add wait for token ranges * Make keyspace DDL configurable * Rename PartitionVisitor to Visitor

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git

commit 275f188660b66743bf3f055c8d7da438ad826061
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Thu Nov 25 08:45:00 2021 +0100

    Features:
      * Implement lookbehind via tracker callbacks
      * Improve DSL
      * Rename maxLts to peek
      * Split lts visitors from visitors
      * Allow create table if not exists
      * Allow sampler to be triggered at every LTS
      * Allow local state validator to always run
      * Add Staged Runner
      * Add wait for token ranges
      * Make keyspace DDL configurable
      * Rename PartitionVisitor to Visitor
    
    Bugfixes:
      * Fix for queue draining
      * Fix distribution of the single-op values
      * Fix bug in schema helper: static columns are listed as duplicates
    
    Patch by Alex Petrov for CASSANDRA-16262
    
    Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
---
 harry-core/src/harry/core/Configuration.java       | 145 ++++----
 harry-core/src/harry/core/Run.java                 |   2 -
 .../src/harry/corruptor/HideRowCorruptor.java      |   2 +-
 .../src/harry/corruptor/HideValueCorruptor.java    |   4 +-
 .../src/harry/corruptor/ShowValueCorruptor.java    |   2 +-
 harry-core/src/harry/ddl/ColumnSpec.java           |  16 +
 harry-core/src/harry/ddl/SchemaSpec.java           |   5 +-
 harry-core/src/harry/dsl/HistoryBuilder.java       |  24 +-
 harry-core/src/harry/generators/RngUtils.java      |  16 +
 harry-core/src/harry/model/OpSelectors.java        |  38 +-
 harry-core/src/harry/model/QuiescentChecker.java   |  38 +-
 .../model/clock/ApproximateMonotonicClock.java     |  16 +-
 harry-core/src/harry/model/clock/OffsetClock.java  |   9 +-
 harry-core/src/harry/reconciler/Reconciler.java    |  27 +-
 harry-core/src/harry/runner/DataTracker.java       |  54 ++-
 .../src/harry/runner/DefaultDataTracker.java       |  17 +-
 harry-core/src/harry/runner/HarryRunner.java       |  13 +-
 harry-core/src/harry/runner/Runner.java            | 406 ++++++++++++++-------
 harry-core/src/harry/runner/StagedRunner.java      | 201 ++++++++++
 harry-core/src/harry/runner/UpToLtsRunner.java     | 105 ++++++
 harry-core/src/harry/util/ByteUtils.java           | 196 ++++++++++
 .../src/harry/visitors/AllPartitionsValidator.java |  38 +-
 .../src/harry/visitors/CorruptingVisitor.java      |   7 +-
 .../src/harry/visitors/DelegatingVisitor.java      |  61 ----
 .../src/harry/visitors/GeneratingVisitor.java      |   9 +-
 harry-core/src/harry/visitors/LoggingVisitor.java  |   1 -
 harry-core/src/harry/visitors/LtsVisitor.java      |  97 +++++
 harry-core/src/harry/visitors/MutatingVisitor.java |  24 +-
 .../src/harry/visitors/ParallelValidator.java      |   4 +-
 harry-core/src/harry/visitors/RecentValidator.java |  48 ++-
 .../src/harry/visitors/ReplayingVisitor.java       |  11 +-
 harry-core/src/harry/visitors/Sampler.java         |  11 +-
 harry-core/src/harry/visitors/SingleValidator.java |   6 +-
 harry-core/src/harry/visitors/VisitExecutor.java   |  14 +-
 harry-core/src/harry/visitors/Visitor.java         |   8 +-
 harry-core/test/harry/model/OpSelectorsTest.java   | 148 ++++----
 .../harry/runner/external/HarryRunnerExternal.java |   2 +-
 .../src/harry/model/sut/ByteUtils.java             | 115 ++++++
 .../model/sut/InJVMTokenAwareVisitExecutor.java    |  11 +-
 .../src/harry/model/sut/InJvmSut.java              |  14 +-
 .../src/harry/runner/HarryRunnerJvm.java           |   2 +-
 .../harry/runner/RepairingLocalStateValidator.java |  21 +-
 .../src/harry/runner/TrivialShrinker.java          |  25 +-
 .../src/harry/visitors/SkippingVisitor.java        |  13 +-
 .../generators/DataGeneratorsIntegrationTest.java  |   6 +-
 .../harry/model/HistoryBuilderIntegrationTest.java |  27 +-
 .../test/harry/model/HistoryBuilderTest.java       |   3 +-
 .../harry/model/InJVMTokenAwareExecutorTest.java   |  15 +-
 .../test/harry/model/IntegrationTestBase.java      |   4 +-
 .../test/harry/model/ModelTestBase.java            |  32 +-
 .../harry/model/QuerySelectorNegativeTest.java     |   7 +-
 .../test/harry/model/QuerySelectorTest.java        |  14 +-
 .../model/QuiescentCheckerIntegrationTest.java     |  39 +-
 .../test/resources/single_partition_test.yml       |   5 +-
 54 files changed, 1550 insertions(+), 628 deletions(-)

diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index c6fecc7..3545e18 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -76,6 +76,7 @@ public class Configuration
         mapper.registerSubtypes(Configuration.ApproximateMonotonicClockConfiguration.class);
         mapper.registerSubtypes(Configuration.ConcurrentRunnerConfig.class);
         mapper.registerSubtypes(Configuration.SequentialRunnerConfig.class);
+        mapper.registerSubtypes(Configuration.SingleVisitRunnerConfig.class);
         mapper.registerSubtypes(Configuration.DefaultDataTrackerConfiguration.class);
         mapper.registerSubtypes(Configuration.NoOpDataTrackerConfiguration.class);
 
@@ -106,6 +107,7 @@ public class Configuration
     public final SchemaProviderConfiguration schema_provider;
 
     public final boolean drop_schema;
+    public final String keyspace_ddl;
     public final boolean create_schema;
     public final boolean truncate_table;
 
@@ -117,13 +119,11 @@ public class Configuration
     public final PDSelectorConfiguration partition_descriptor_selector;
     public final CDSelectorConfiguration clustering_descriptor_selector;
 
-    public final long run_time;
-    public final TimeUnit run_time_unit;
-
     @JsonCreator
     public Configuration(@JsonProperty("seed") long seed,
                          @JsonProperty("schema_provider") SchemaProviderConfiguration schema_provider,
                          @JsonProperty("drop_schema") boolean drop_schema,
+                         @JsonProperty("create_keyspace") String keyspace_ddl,
                          @JsonProperty("create_schema") boolean create_schema,
                          @JsonProperty("truncate_schema") boolean truncate_table,
                          @JsonProperty("metric_reporter") MetricReporterConfiguration metric_reporter,
@@ -132,12 +132,11 @@ public class Configuration
                          @JsonProperty("system_under_test") SutConfiguration system_under_test,
                          @JsonProperty("data_tracker") DataTrackerConfiguration data_tracker,
                          @JsonProperty("partition_descriptor_selector") PDSelectorConfiguration partition_descriptor_selector,
-                         @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector,
-                         @JsonProperty(value = "run_time", defaultValue = "2") long run_time,
-                         @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit run_time_unit)
+                         @JsonProperty("clustering_descriptor_selector") CDSelectorConfiguration clustering_descriptor_selector)
     {
         this.seed = seed;
         this.schema_provider = schema_provider;
+        this.keyspace_ddl = keyspace_ddl;
         this.drop_schema = drop_schema;
         this.create_schema = create_schema;
         this.truncate_table = truncate_table;
@@ -147,8 +146,6 @@ public class Configuration
         this.data_tracker = data_tracker;
         this.partition_descriptor_selector = partition_descriptor_selector;
         this.clustering_descriptor_selector = clustering_descriptor_selector;
-        this.run_time = run_time;
-        this.run_time_unit = run_time_unit;
         this.runner = runner;
     }
 
@@ -233,14 +230,15 @@ public class Configuration
         OpSelectors.MonotonicClock clock = snapshot.clock.make();
 
         MetricReporter metricReporter = snapshot.metric_reporter.make();
-        // TODO: parse schema
-        SchemaSpec schemaSpec = snapshot.schema_provider.make(seed);
+
+        // TODO: validate that operation kind is compatible with schema, due to statics etc
+        SystemUnderTest sut = snapshot.system_under_test.make();
+
+        SchemaSpec schemaSpec = snapshot.schema_provider.make(seed, sut);
         schemaSpec.validate();
 
         OpSelectors.PdSelector pdSelector = snapshot.partition_descriptor_selector.make(rng);
         OpSelectors.DescriptorSelector descriptorSelector = snapshot.clustering_descriptor_selector.make(rng, schemaSpec);
-        // TODO: validate that operation kind is compactible with schema, due to statics etc
-        SystemUnderTest sut = snapshot.system_under_test.make();
 
         return new Run(rng,
                        clock,
@@ -263,6 +261,7 @@ public class Configuration
         long seed;
         SchemaProviderConfiguration schema_provider = new DefaultSchemaProviderConfiguration();
 
+        String keyspace_ddl;
         boolean drop_schema;
         boolean create_schema;
         boolean truncate_table;
@@ -275,9 +274,6 @@ public class Configuration
         PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
         CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
 
-        long run_time = 2;
-        TimeUnit run_time_unit = TimeUnit.HOURS;
-
         public ConfigurationBuilder setSeed(long seed)
         {
             this.seed = seed;
@@ -290,20 +286,19 @@ public class Configuration
             return this;
         }
 
-        public ConfigurationBuilder setRunTime(long runTime, TimeUnit runTimeUnit)
+        public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
         {
-            this.run_time_unit = Objects.requireNonNull(runTimeUnit, "unit");
-            this.run_time = runTime;
+            this.data_tracker = tracker;
             return this;
         }
 
-
-        public ConfigurationBuilder setDataTracker(DataTrackerConfiguration tracker)
+        public ConfigurationBuilder setDataTracker(String keyspace_ddl)
         {
-            this.data_tracker = tracker;
+            this.keyspace_ddl = keyspace_ddl;
             return this;
         }
 
+
         public ConfigurationBuilder setClock(ClockConfiguration clock)
         {
             this.clock = clock;
@@ -370,20 +365,16 @@ public class Configuration
             return new Configuration(seed,
                                      schema_provider,
                                      drop_schema,
+                                     keyspace_ddl,
                                      create_schema,
                                      truncate_table,
                                      metric_reporter,
                                      clock,
-
                                      runner,
                                      system_under_test,
                                      data_tracker,
-
                                      partition_descriptor_selector,
-                                     clustering_descriptor_selector,
-
-                                     run_time,
-                                     run_time_unit);
+                                     clustering_descriptor_selector);
         }
     }
 
@@ -403,8 +394,6 @@ public class Configuration
         builder.partition_descriptor_selector = partition_descriptor_selector;
         builder.clustering_descriptor_selector = clustering_descriptor_selector;
 
-        builder.run_time = run_time;
-        builder.run_time_unit = run_time_unit;
         return builder;
     }
 
@@ -425,36 +414,10 @@ public class Configuration
 
         public DataTracker make()
         {
-            return new DataTracker()
-            {
-                public void started(long lts)
-                {
-                }
-
-                public void finished(long lts)
-                {
-
-                }
-
-                public long maxStarted()
-                {
-                    return 0;
-                }
-
-                public long maxConsecutiveFinished()
-                {
-                    return 0;
-                }
-
-                public DataTrackerConfiguration toConfig()
-                {
-                    return null;
-                }
-            };
+            return DataTracker.NO_OP;
         }
     }
 
-
     @JsonTypeName("default")
     public static class DefaultDataTrackerConfiguration implements DataTrackerConfiguration
     {
@@ -562,38 +525,74 @@ public class Configuration
     public static class ConcurrentRunnerConfig implements RunnerConfiguration
     {
         public final int concurrency;
-        public final List<VisitorConfiguration> visitor_factories;
+
+        @JsonProperty(value = "visitors")
+        public final List<VisitorConfiguration> visitorFactories;
+
+        public final long run_time;
+        public final TimeUnit run_time_unit;
 
         @JsonCreator
-        public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
-                                      @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
+        public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "4") int concurrency,
+                                      @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
+                                      @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                      @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
         {
             this.concurrency = concurrency;
-            this.visitor_factories = visitors;
+            this.visitorFactories = visitors;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
         }
 
         @Override
         public Runner make(Run run, Configuration config)
         {
-            return new Runner.ConcurrentRunner(run, config, concurrency, visitor_factories);
+            return new Runner.ConcurrentRunner(run, config, concurrency, visitorFactories, run_time, run_time_unit);
         }
     }
 
     @JsonTypeName("sequential")
     public static class SequentialRunnerConfig implements RunnerConfiguration
     {
-        public final List<VisitorConfiguration> visitor_factories;
+        @JsonProperty(value = "visitors")
+        public final List<VisitorConfiguration> visitorFactories;
+
+        public final long run_time;
+        public final TimeUnit run_time_unit;
+
+        @JsonCreator
+        public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors,
+                                      @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                      @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+        {
+            this.visitorFactories = visitors;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
+        }
+
+        @Override
+        public Runner make(Run run, Configuration config)
+        {
+            return new Runner.SequentialRunner(run, config, visitorFactories, run_time, run_time_unit);
+        }
+    }
+
+    @JsonTypeName("single")
+    public static class SingleVisitRunnerConfig implements RunnerConfiguration
+    {
+        @JsonProperty(value = "visitors")
+        public final List<VisitorConfiguration> visitorFactories;
 
         @JsonCreator
-        public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
+        public SingleVisitRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
         {
-            this.visitor_factories = visitors;
+            this.visitorFactories = visitors;
         }
 
         @Override
         public Runner make(Run run, Configuration config)
         {
-            return new Runner.SequentialRunner(run, config, visitor_factories);
+            return new Runner.SingleVisitRunner(run, config, visitorFactories);
         }
     }
 
@@ -675,7 +674,7 @@ public class Configuration
             operation_kind_weights = new HashMap<>();
         }
 
-        public WeightedSelectorBuilder addWeight(T v, int weight)
+        public WeightedSelectorBuilder<T> addWeight(T v, int weight)
         {
             operation_kind_weights.put(v, weight);
             return this;
@@ -852,7 +851,7 @@ public class Configuration
         }
     }
 
-    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
     public interface DistributionConfig extends Distribution.DistributionFactory
     {
     }
@@ -942,7 +941,7 @@ public class Configuration
         @Override
         public Visitor make(Run run)
         {
-            return new MutatingVisitor(run, row_visitor);
+            return new MutatingVisitor(run, row_visitor::make);
         }
     }
 
@@ -960,7 +959,7 @@ public class Configuration
         @Override
         public Visitor make(Run run)
         {
-            return new LoggingVisitor(run, row_visitor);
+            return new LoggingVisitor(run, row_visitor::make);
         }
     }
 
@@ -969,6 +968,8 @@ public class Configuration
     {
         public final int concurrency;
         public final int trigger_after;
+
+        @JsonProperty("model")
         public final Configuration.ModelConfiguration modelConfiguration;
 
         @JsonCreator
@@ -1055,7 +1056,7 @@ public class Configuration
     @JsonTypeName("default")
     public static class DefaultSchemaProviderConfiguration implements SchemaProviderConfiguration
     {
-        public SchemaSpec make(long seed)
+        public SchemaSpec make(long seed, SystemUnderTest sut)
         {
             return SchemaGenerators.defaultSchemaSpecGen("harry", "table0")
                                    .inflate(seed);
@@ -1103,7 +1104,7 @@ public class Configuration
             this.regular_columns = regulars;
             this.static_keys = statics;
         }
-        public SchemaSpec make(long seed)
+        public SchemaSpec make(long seed, SystemUnderTest sut)
         {
             return schemaSpec;
         }
@@ -1122,6 +1123,4 @@ public class Configuration
             return MetricReporter.NO_OP;
         }
     }
-
-    // TODO: schema provider by DDL
 }
diff --git a/harry-core/src/harry/core/Run.java b/harry-core/src/harry/core/Run.java
index b0c8afc..e3bc203 100644
--- a/harry-core/src/harry/core/Run.java
+++ b/harry-core/src/harry/core/Run.java
@@ -43,7 +43,6 @@ public class Run
                OpSelectors.MonotonicClock clock,
                OpSelectors.PdSelector pdSelector,
                OpSelectors.DescriptorSelector descriptorSelector,
-
                SchemaSpec schemaSpec,
                DataTracker tracker,
                SystemUnderTest sut,
@@ -59,7 +58,6 @@ public class Run
                 OpSelectors.PdSelector pdSelector,
                 OpSelectors.DescriptorSelector descriptorSelector,
                 QueryGenerator rangeSelector,
-
                 SchemaSpec schemaSpec,
                 DataTracker tracker,
                 SystemUnderTest sut,
diff --git a/harry-core/src/harry/corruptor/HideRowCorruptor.java b/harry-core/src/harry/corruptor/HideRowCorruptor.java
index a2a2648..ca16c2e 100644
--- a/harry-core/src/harry/corruptor/HideRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/HideRowCorruptor.java
@@ -44,6 +44,6 @@ public class HideRowCorruptor implements RowCorruptor
 
     public CompiledStatement corrupt(ResultSetRow row)
     {
-        return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.maxLts()) + 1);
+        return DeleteHelper.deleteRow(schema, row.pd, row.cd, clock.rts(clock.peek()));
     }
 }
diff --git a/harry-core/src/harry/corruptor/HideValueCorruptor.java b/harry-core/src/harry/corruptor/HideValueCorruptor.java
index 3cedcda..c8617ff 100644
--- a/harry-core/src/harry/corruptor/HideValueCorruptor.java
+++ b/harry-core/src/harry/corruptor/HideValueCorruptor.java
@@ -77,7 +77,7 @@ public class HideValueCorruptor implements RowCorruptor
                                                  row.pd,
                                                  mask,
                                                  schema.regularAndStaticColumnsMask(),
-                                                 clock.rts(clock.maxLts()) + 1);
+                                                 clock.rts(clock.peek()));
             }
         }
 
@@ -96,6 +96,6 @@ public class HideValueCorruptor implements RowCorruptor
                                          row.cd,
                                          mask,
                                          schema.regularAndStaticColumnsMask(),
-                                         clock.rts(clock.maxLts()) + 1);
+                                         clock.rts(clock.peek()));
     }
 }
diff --git a/harry-core/src/harry/corruptor/ShowValueCorruptor.java b/harry-core/src/harry/corruptor/ShowValueCorruptor.java
index 01372ce..236709e 100644
--- a/harry-core/src/harry/corruptor/ShowValueCorruptor.java
+++ b/harry-core/src/harry/corruptor/ShowValueCorruptor.java
@@ -72,6 +72,6 @@ public class ShowValueCorruptor implements RowCorruptor
         // We do not know LTS of the deleted row. We could try inferring it, but that
         // still won't help since we can't use it anyways, since collisions between a
         // written value and tombstone are resolved in favour of tombstone.
-        return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.maxLts()) + 1);
+        return WriteHelper.inflateInsert(schema, row.pd, row.cd, corruptedVds, null, clock.rts(clock.peek()));
     }
 }
diff --git a/harry-core/src/harry/ddl/ColumnSpec.java b/harry-core/src/harry/ddl/ColumnSpec.java
index 6d652c4..9b52805 100644
--- a/harry-core/src/harry/ddl/ColumnSpec.java
+++ b/harry-core/src/harry/ddl/ColumnSpec.java
@@ -264,6 +264,21 @@ public class ColumnSpec<T>
         }
     };
 
+    public static final DataType<String> textType = new DataType<String>("text")
+    {
+        private final Bijections.Bijection<String> gen = new StringBijection();
+
+        public Bijections.Bijection<String> generator()
+        {
+            return gen;
+        }
+
+        public int compareLexicographically(long l, long r)
+        {
+            return Long.compare(l, r);
+        }
+    };
+
     public static DataType<String> asciiType(int nibbleSize, int maxRandomBytes)
     {
         Bijections.Bijection<String> gen = new StringBijection(nibbleSize, maxRandomBytes);
@@ -325,6 +340,7 @@ public class ColumnSpec<T>
     ColumnSpec.floatType,
     ColumnSpec.doubleType,
     ColumnSpec.asciiType,
+    ColumnSpec.textType,
     ColumnSpec.uuidType,
     ColumnSpec.timestampType));
 
diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java
index eba87c6..8d8a62f 100644
--- a/harry-core/src/harry/ddl/SchemaSpec.java
+++ b/harry-core/src/harry/ddl/SchemaSpec.java
@@ -26,6 +26,7 @@ import java.util.Objects;
 import java.util.function.Consumer;
 
 import harry.generators.DataGenerators;
+import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.operations.Relation;
 import harry.util.BitSet;
@@ -36,7 +37,7 @@ public class SchemaSpec
 {
     public interface SchemaSpecFactory
     {
-        public SchemaSpec make(long seed);
+        public SchemaSpec make(long seed, SystemUnderTest sut);
     }
 
     public final DataGenerators.KeyGenerator pkGenerator;
@@ -244,7 +245,7 @@ public class SchemaSpec
     {
         StringBuilder sb = new StringBuilder();
 
-        sb.append("CREATE TABLE ");
+        sb.append("CREATE TABLE IF NOT EXISTS ");
         sb.append(keyspace)
           .append(".")
           .append(table)
diff --git a/harry-core/src/harry/dsl/HistoryBuilder.java b/harry-core/src/harry/dsl/HistoryBuilder.java
index 5e1ff6f..fa26277 100644
--- a/harry-core/src/harry/dsl/HistoryBuilder.java
+++ b/harry-core/src/harry/dsl/HistoryBuilder.java
@@ -31,8 +31,11 @@ import java.util.TreeSet;
 import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 
+import harry.core.Configuration;
 import harry.core.Run;
 import harry.model.OpSelectors;
+import harry.visitors.MutatingRowVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.ReplayingVisitor;
 import harry.visitors.VisitExecutor;
 
@@ -598,9 +601,19 @@ public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
         return new ReplayingVisitor.Operation(cd, opId, opType);
     }
 
+    public void replayAll(Run run)
+    {
+        visitor(run).replayAll();
+    }
+
+    public ReplayingVisitor visitor(Run run)
+    {
+        return visitor(new MutatingVisitor.MutatingVisitExecutor(run, new MutatingRowVisitor(run)));
+    }
+
     public ReplayingVisitor visitor(VisitExecutor executor)
     {
-        return new ReplayingVisitor(executor)
+        return new ReplayingVisitor(executor, run.clock::nextLts)
         {
             public Visit getVisit(long lts)
             {
@@ -608,16 +621,15 @@ public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
                 return log.get((int) lts);
             }
 
-            public void replayAll(Run run)
+            public void replayAll()
             {
                 long maxLts = HistoryBuilder.this.lts;
+
                 while (true)
                 {
-                    long lts = run.clock.currentLts();
-                    if (lts >= maxLts)
+                    if (run.clock.peek() >= maxLts)
                         return;
-                    visit(lts);
-                    run.clock.nextLts();
+                    visit();
                 }
             }
         };
diff --git a/harry-core/src/harry/generators/RngUtils.java b/harry-core/src/harry/generators/RngUtils.java
index 894204f..c974476 100644
--- a/harry-core/src/harry/generators/RngUtils.java
+++ b/harry-core/src/harry/generators/RngUtils.java
@@ -20,8 +20,15 @@ package harry.generators;
 
 import java.util.function.LongSupplier;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.visitors.AllPartitionsValidator;
+
 public class RngUtils
 {
+    private static final Logger logger = LoggerFactory.getLogger(RngUtils.class);
+
     private static final long CONSTANT = 0x2545F4914F6CDD1DL;
     public static long next(long input)
     {
@@ -131,8 +138,15 @@ public class RngUtils
         long min = 0;
         long max = ~0L;
         int n = 0;
+        int steps = 0;
         while (n != bits)
         {
+            if (steps > 10_000)
+            {
+                throw new RuntimeException(String.format("Could not generate bits after 10K tries. " +
+                                          "Inputs: bits=%d, length=%d", bits, length));
+            }
+
             long x = rng.getAsLong() & mask;
             x = min | (x & max);
             n = Long.bitCount(x);
@@ -140,6 +154,8 @@ public class RngUtils
                 max = x;
             else
                 min = x;
+
+            steps++;
         }
         return min;
     }
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index 7e5fc17..7727828 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -18,9 +18,11 @@
 
 package harry.model;
 
+import java.util.ArrayList;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.LongConsumer;
 
 import harry.core.Configuration;
 import harry.core.VisibleForTesting;
@@ -76,19 +78,15 @@ public interface OpSelectors
      * be taken to map a real-time timestamp from the value retrieved from the database in order
      * to map it back to the logical timestamp of the operation that wrote this value.
      */
-    public static interface MonotonicClock
+    public interface MonotonicClock
     {
         long rts(long lts);
-
         long lts(long rts);
 
-        long currentLts();
-
         long nextLts();
+        long peek();
 
-        long maxLts();
-
-        public Configuration.ClockConfiguration toConfig();
+        Configuration.ClockConfiguration toConfig();
     }
 
     public static interface MonotonicClockFactory
@@ -304,6 +302,7 @@ public interface OpSelectors
             return minLtsAt(position);
         }
 
+        // TODO: add maxPosition to make it easier/more accessible for the components like sampler, etc
         public long positionFor(long lts)
         {
             long windowStart = lts / switchAfter;
@@ -682,6 +681,14 @@ public interface OpSelectors
         public BitSet partitionLevelOperationsMask(long pd, long lts)
         {
             int totalOps = opsPerModification(lts) * numberOfModifications(lts);
+            if (totalOps > 64)
+            {
+                throw new IllegalArgumentException("RngUtils#randomBits currently supports only up to 64 bits of entropy, so we can not " +
+                                                   "split partition and row level operations for more than 64 operations at the moment." +
+                                                   "Set modifications_per_lts to a number that is lower than 64 and use rows_per_modification" +
+                                                   "to have more operations per LTS instead");
+            }
+
             long seed = rng.randomNumber(pd, lts);
 
             int partitionLevelOps = (int) Math.ceil(operationSelector.partitionLevelThreshold * totalOps);
@@ -692,8 +699,17 @@ public interface OpSelectors
 
         private OperationKind operationType(long pd, long lts, long opId, BitSet partitionLevelOperationsMask)
         {
-            long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
-            return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
+            try
+            {
+                long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
+                return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
+            }
+            catch (Throwable t)
+            {
+                throw new RuntimeException(String.format("Can not generate a random number with the following inputs: " +
+                                                         "pd=%d lts=%d opId=%d partitionLevelOperationsMask=%s",
+                                                         pd, lts, opId, partitionLevelOperationsMask));
+            }
         }
 
         public BitSet columnMask(long pd, long lts, long opId, OperationKind opType)
@@ -704,12 +720,12 @@ public interface OpSelectors
 
         public long vd(long pd, long cd, long lts, long opId, int col)
         {
-            return rng.randomNumber(opId, pd ^ cd ^ lts ^ col);
+            return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col);
         }
 
         public long modificationId(long pd, long cd, long lts, long vd, int col)
         {
-            return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col);
+            return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col) - 1;
         }
     }
 
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index a630df2..0246e98 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -41,7 +41,7 @@ public class QuiescentChecker implements Model
     protected final DataTracker tracker;
     protected final SystemUnderTest sut;
     protected final Reconciler reconciler;
-    protected final SchemaSpec schemaSpec;
+    protected final SchemaSpec schema;
 
     public QuiescentChecker(Run run)
     {
@@ -54,7 +54,7 @@ public class QuiescentChecker implements Model
         this.sut = run.sut;
         this.reconciler = reconciler;
         this.tracker = run.tracker;
-        this.schemaSpec = run.schemaSpec;
+        this.schema = run.schemaSpec;
     }
 
     public void validate(Query query)
@@ -84,8 +84,8 @@ public class QuiescentChecker implements Model
         {
             ResultSetRow actualRowState = actual.next();
             if (actualRowState.cd != partitionState.staticRow().cd)
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Found a row while model predicts statics only:" +
                                               "\nExpected: %s" +
                                               "\nActual: %s" +
@@ -96,8 +96,8 @@ public class QuiescentChecker implements Model
             for (int i = 0; i < actualRowState.vds.length; i++)
             {
                 if (actualRowState.vds[i] != NIL_DESCR || actualRowState.lts[i] != NO_TIMESTAMP)
-                    throw new ValidationException(partitionState.toString(schemaSpec),
-                                                  toString(actualRows, schemaSpec),
+                    throw new ValidationException(partitionState.toString(schema),
+                                                  toString(actualRows, schema),
                                                   "Found a row while model predicts statics only:" +
                                                   "\nActual: %s" +
                                                   "\nQuery: %s" +
@@ -105,7 +105,7 @@ public class QuiescentChecker implements Model
                                                   actualRowState, query.toSelectStatement());
             }
 
-            assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schemaSpec);
+            assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema);
         }
 
         while (actual.hasNext() && expected.hasNext())
@@ -114,45 +114,45 @@ public class QuiescentChecker implements Model
             Reconciler.RowState expectedRowState = expected.next();
             // TODO: this is not necessarily true. It can also be that ordering is incorrect.
             if (actualRowState.cd != expectedRowState.cd)
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Found a row in the model that is not present in the resultset:" +
                                               "\nExpected: %s" +
                                               "\nActual: %s" +
                                               "\nQuery: %s",
-                                              expectedRowState.toString(schemaSpec),
+                                              expectedRowState.toString(schema),
                                               actualRowState, query.toSelectStatement());
 
             if (!Arrays.equals(actualRowState.vds, expectedRowState.vds))
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Returned row state doesn't match the one predicted by the model:" +
                                               "\nExpected: %s (%s)" +
                                               "\nActual:   %s (%s)." +
                                               "\nQuery: %s",
-                                              Arrays.toString(expectedRowState.vds), expectedRowState.toString(schemaSpec),
+                                              Arrays.toString(expectedRowState.vds), expectedRowState.toString(schema),
                                               Arrays.toString(actualRowState.vds), actualRowState,
                                               query.toSelectStatement());
 
             if (!Arrays.equals(actualRowState.lts, expectedRowState.lts))
-                throw new ValidationException(partitionState.toString(schemaSpec),
-                                              toString(actualRows, schemaSpec),
+                throw new ValidationException(partitionState.toString(schema),
+                                              toString(actualRows, schema),
                                               "Timestamps in the row state don't match ones predicted by the model:" +
                                               "\nExpected: %s (%s)" +
                                               "\nActual:   %s (%s)." +
                                               "\nQuery: %s",
-                                              Arrays.toString(expectedRowState.lts), expectedRowState.toString(schemaSpec),
+                                              Arrays.toString(expectedRowState.lts), expectedRowState.toString(schema),
                                               Arrays.toString(actualRowState.lts), actualRowState,
                                               query.toSelectStatement());
 
             if (partitionState.staticRow() != null || actualRowState.sds != null || actualRowState.slts != null)
-                assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schemaSpec);
+                assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema);
         }
 
         if (actual.hasNext() || expected.hasNext())
         {
-            throw new ValidationException(partitionState.toString(schemaSpec),
-                                          toString(actualRows, schemaSpec),
+            throw new ValidationException(partitionState.toString(schema),
+                                          toString(actualRows, schema),
                                           "Expected results to have the same number of results, but %s result iterator has more results." +
                                           "\nExpected: %s" +
                                           "\nActual:   %s" +
diff --git a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
index 186a618..6c6f27f 100644
--- a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
+++ b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
@@ -48,9 +48,9 @@ import harry.model.OpSelectors;
 // TODO: shut down
 public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
 {
-    private static long START_VALUE = 0;
-    private static long DEFUNCT = Long.MIN_VALUE;
-    private static long REBASE_IN_PROGRESS = Long.MIN_VALUE + 1;
+    public static final long START_VALUE = 0;
+    public static final long DEFUNCT = Long.MIN_VALUE;
+    public static final long REBASE_IN_PROGRESS = Long.MIN_VALUE + 1;
 
     // TODO: there's a theoretical possibility of a bug; when we have several consecutive epochs without
     // change in LTS, current implementation will return the latest epoch instead of the earliest one.
@@ -151,11 +151,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
             throw new IllegalStateException("No thread should have changed LTS during rebase. " + lts.get());
     }
 
-    public long currentLts()
-    {
-        return lts.get();
-    }
-
+    @Override
     public long nextLts()
     {
         long current = lts.get();
@@ -184,7 +180,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
         }
     }
 
-    public long maxLts()
+    public long peek()
     {
         while (true)
         {
@@ -236,7 +232,7 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
     // TODO: binary search instead
     public long rts(final long lts)
     {
-        assert lts <= maxLts() : String.format("Queried for LTS we haven't yet issued %d. Max is %d.", lts, maxLts());
+        assert lts <= peek() : String.format("Queried for LTS we haven't yet issued %d. Max is %d.", lts, peek());
 
         final int historyIdx = idx - 1;
         for (int i = 0; i < historySize - 1 && historyIdx - i >= 0; i++)
diff --git a/harry-core/src/harry/model/clock/OffsetClock.java b/harry-core/src/harry/model/clock/OffsetClock.java
index 6040125..6e8f891 100644
--- a/harry-core/src/harry/model/clock/OffsetClock.java
+++ b/harry-core/src/harry/model/clock/OffsetClock.java
@@ -28,7 +28,7 @@ import harry.model.OpSelectors;
 
 public class OffsetClock implements OpSelectors.MonotonicClock
 {
-    final AtomicLong lts = new AtomicLong(0);
+    final AtomicLong lts = new AtomicLong(ApproximateMonotonicClock.START_VALUE);
 
     private final long base;
 
@@ -47,17 +47,12 @@ public class OffsetClock implements OpSelectors.MonotonicClock
         return rts - base;
     }
 
-    public long currentLts()
-    {
-        return lts.get();
-    }
-
     public long nextLts()
     {
         return lts.getAndIncrement();
     }
 
-    public long maxLts()
+    public long peek()
     {
         return lts.get();
     }
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index 275b7c7..45649be 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -35,12 +35,12 @@ import harry.core.Run;
 import harry.ddl.ColumnSpec;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
-import harry.visitors.GeneratingVisitor;
-import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.LtsVisitor;
 import harry.visitors.ReplayingVisitor;
 import harry.visitors.VisitExecutor;
 
@@ -67,7 +67,7 @@ public class Reconciler
     private final QueryGenerator rangeSelector;
     private final SchemaSpec schema;
 
-    private final Function<VisitExecutor, Visitor> visitorFactory;
+    private final Function<VisitExecutor, LtsVisitor> visitorFactory;
 
     public Reconciler(Run run)
     {
@@ -76,13 +76,13 @@ public class Reconciler
     }
 
     public Reconciler(Run run,
-                      Function<VisitExecutor, Visitor> visitorFactory)
+                      Function<VisitExecutor, LtsVisitor> ltsVisitorFactory)
     {
         this.descriptorSelector = run.descriptorSelector;
         this.pdSelector = run.pdSelector;
         this.schema = run.schemaSpec;
         this.rangeSelector = run.rangeSelector;
-        this.visitorFactory = visitorFactory;
+        this.visitorFactory = ltsVisitorFactory;
     }
 
     private final long debugCd = Long.getLong("harry.reconciler.debug_cd", -1L);
@@ -91,7 +91,7 @@ public class Reconciler
     {
         PartitionState partitionState = new PartitionState();
 
-        class Processor implements VisitExecutor
+        class Processor extends VisitExecutor
         {
             // Whether or not a partition deletion was encountered on this LTS.
             private boolean hadPartitionDeletion = false;
@@ -100,7 +100,7 @@ public class Reconciler
             private final List<ReplayingVisitor.Operation> columnDeletes = new ArrayList<>();
 
             @Override
-            public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+            protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
             {
                 if (hadPartitionDeletion)
                     return;
@@ -151,7 +151,7 @@ public class Reconciler
             }
 
             @Override
-            public void beforeLts(long lts, long pd)
+            protected void beforeLts(long lts, long pd)
             {
                 rangeDeletes.clear();
                 writes.clear();
@@ -160,7 +160,7 @@ public class Reconciler
             }
 
             @Override
-            public void afterLts(long lts, long pd)
+            protected void afterLts(long lts, long pd)
             {
                 if (hadPartitionDeletion)
                     return;
@@ -248,13 +248,16 @@ public class Reconciler
             }
 
             @Override
-            public void afterBatch(long lts, long pd, long m) {}
+            protected void afterBatch(long lts, long pd, long m) {}
+
+            @Override
+            protected void beforeBatch(long lts, long pd, long m) {}
 
             @Override
-            public void beforeBatch(long lts, long pd, long m) {}
+            public void shutdown() throws InterruptedException {}
         }
 
-        Visitor visitor = visitorFactory.apply(new Processor());
+        LtsVisitor visitor = visitorFactory.apply(new Processor());
 
         long currentLts = pdSelector.minLtsFor(pd);
 
diff --git a/harry-core/src/harry/runner/DataTracker.java b/harry-core/src/harry/runner/DataTracker.java
index 9f518d1..0171e43 100644
--- a/harry-core/src/harry/runner/DataTracker.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -18,30 +18,62 @@
 
 package harry.runner;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.LongConsumer;
+
 import harry.core.Configuration;
 
-public interface DataTracker
+public abstract class DataTracker
 {
-    void started(long lts);
-    void finished(long lts);
+    protected List<LongConsumer> onStarted = new ArrayList<>();
+    protected List<LongConsumer> onFinished = new ArrayList<>();
+
+    public void onLtsStarted(LongConsumer onLts)
+    {
+        this.onStarted.add(onLts);
+    }
 
-    long maxStarted();
-    long maxConsecutiveFinished();
+    public void onLtsFinished(LongConsumer onLts)
+    {
+        this.onFinished.add(onLts);
+    }
 
-    public Configuration.DataTrackerConfiguration toConfig();
+    public void started(long lts)
+    {
+        startedInternal(lts);
+        for (LongConsumer consumer : onStarted)
+            consumer.accept(lts);
+    }
 
-    interface DataTrackerFactory {
+    public void finished(long lts)
+    {
+        finishedInternal(lts);
+        for (LongConsumer consumer : onFinished)
+            consumer.accept(lts);
+    }
+
+    abstract void startedInternal(long lts);
+    abstract void finishedInternal(long lts);
+
+    public abstract long maxStarted();
+    public abstract long maxConsecutiveFinished();
+
+    public abstract Configuration.DataTrackerConfiguration toConfig();
+
+    public static interface DataTrackerFactory
+    {
         DataTracker make();
     }
 
-    public static DataTracker NO_OP = new NoOpDataTracker();
+    public static final DataTracker NO_OP = new NoOpDataTracker();
 
-    class NoOpDataTracker implements DataTracker
+    public static class NoOpDataTracker extends DataTracker
     {
         private NoOpDataTracker() {}
 
-        public void started(long lts) {}
-        public void finished(long lts) {}
+        protected void startedInternal(long lts) {}
+        protected void finishedInternal(long lts) {}
         public long maxStarted() { return 0; }
         public long maxConsecutiveFinished() { return 0; }
 
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
index d5cdb4b..85e2637 100644
--- a/harry-core/src/harry/runner/DefaultDataTracker.java
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -29,14 +29,14 @@ import org.slf4j.LoggerFactory;
 import harry.core.Configuration;
 import harry.core.VisibleForTesting;
 
-public class DefaultDataTracker implements DataTracker
+public class DefaultDataTracker extends DataTracker
 {
     private static final Logger logger = LoggerFactory.getLogger(DefaultDataTracker.class);
 
     private final AtomicLong maxSeenLts;
     // TODO: This is a trivial implementation that can be significantly improved upon
     // for example, we could use a bitmap that records `1`s for all lts that are after
-    // the consective, and "collapse" the bitmap state into the long as soon as we see
+    // the consecutive, and "collapse" the bitmap state into the long as soon as we see
     // consecutive `1` on the left side.
     private final AtomicLong maxCompleteLts;
     private final PriorityBlockingQueue<Long> reorderBuffer;
@@ -49,13 +49,12 @@ public class DefaultDataTracker implements DataTracker
     }
 
     // TODO: there's also some room for improvement in terms of concurrency
-    // TODO: remove pd?
-    public void started(long lts)
+    protected void startedInternal(long lts)
     {
         recordEvent(lts, false);
     }
 
-    public void finished(long lts)
+    protected void finishedInternal(long lts)
     {
         recordEvent(lts, true);
     }
@@ -63,10 +62,7 @@ public class DefaultDataTracker implements DataTracker
     private void recordEvent(long lts, boolean finished)
     {
         // all seen LTS are allowed to be "in-flight"
-        maxSeenLts.getAndUpdate((old) -> {
-            assert finished || lts > old : String.format("Attempting to reuse lts: %d. Max seen: %d", lts, old);
-            return Math.max(lts, old);
-        });
+        maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
 
         if (!finished)
             return;
@@ -116,6 +112,9 @@ public class DefaultDataTracker implements DataTracker
 
     public long maxConsecutiveFinished()
     {
+        if (!reorderBuffer.isEmpty())
+            return drainReorderQueue();
+
         return maxCompleteLts.get();
     }
 
diff --git a/harry-core/src/harry/runner/HarryRunner.java b/harry-core/src/harry/runner/HarryRunner.java
index ba911c4..30f7f86 100644
--- a/harry-core/src/harry/runner/HarryRunner.java
+++ b/harry-core/src/harry/runner/HarryRunner.java
@@ -35,9 +35,9 @@ public abstract class HarryRunner
 {
     public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
 
-    protected CompletableFuture progress;
+    protected CompletableFuture<?> progress;
     protected ScheduledThreadPoolExecutor executor;
-    public abstract void beforeRun(Runner runner);
+    public abstract void beforeRun(Runner.TimedRunner runner);
     public void afterRun(Runner runner, Object result)
     {
         executor.shutdown();
@@ -64,7 +64,9 @@ public abstract class HarryRunner
         Run run = runner.getRun();
 
         progress = runner.initAndStartAll();
-        beforeRun(runner);
+        
+        assert runner instanceof Runner.TimedRunner : "Please use a timed runner at the top level.";
+        beforeRun((Runner.TimedRunner) runner);
 
         Object result = null;
 
@@ -76,13 +78,12 @@ public abstract class HarryRunner
                 return a;
             }).get();
             if (result instanceof Throwable)
-                logger.error("Execution failed", result);
+                logger.error("Execution failed!", (Throwable) result);
 
         }
         catch (Throwable e)
         {
-            logger.error("Failed due to exception: " + e.getMessage(),
-                         e);
+            logger.error("Failed due to exception: " + e.getMessage(), e);
             result = e;
         }
         finally
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index 3a84d3e..4f99eab 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -24,40 +24,44 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.model.OpSelectors;
 import harry.visitors.Visitor;
 
-
 public abstract class Runner
 {
     private static final Logger logger = LoggerFactory.getLogger(Runner.class);
 
     protected final Run run;
     protected final Configuration config;
+    protected final ScheduledExecutorService executor;
 
     // If there's an error, there's a good chance we're going to hit it more than once
-    //  since we have multiple concurrent checkers running
+    // since we have multiple concurrent checkers running
     protected final CopyOnWriteArrayList<Throwable> errors;
 
-    public Runner(Run run, Configuration config)
+    public Runner(Run run, Configuration config, int concurrency)
     {
         this.run = run;
         this.config = config;
         this.errors = new CopyOnWriteArrayList<>();
+        this.executor = Executors.newScheduledThreadPool(concurrency);
     }
 
     public Run getRun()
@@ -69,12 +73,11 @@ public abstract class Runner
     {
         if (config.create_schema)
         {
-            // TODO: make RF configurable or make keyspace DDL configurable
-            run.sut.schemaChange("CREATE KEYSPACE " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            if (config.keyspace_ddl == null)
+                run.sut.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + run.schemaSpec.keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+            else
+                run.sut.schemaChange(config.keyspace_ddl);
 
-            run.sut.schemaChange(String.format("DROP TABLE IF EXISTS %s.%s;",
-                                          run.schemaSpec.keyspace,
-                                          run.schemaSpec.table));
             String schema = run.schemaSpec.compile().cql();
             logger.info("Creating table: " + schema);
             run.sut.schemaChange(schema);
@@ -115,13 +118,36 @@ public abstract class Runner
             dumpStateToFile(run, config, errors);
     }
 
-    public abstract CompletableFuture<?> initAndStartAll() throws InterruptedException;
+    public CompletableFuture<?> initAndStartAll()
+    {
+        init();
+        return start();
+    }
 
+    public abstract String type();
+        
     public abstract void shutdown() throws InterruptedException;
+    
+    protected CompletableFuture<?> start()
+    {
+        return start(true, () -> false);
+    }
+    
+    protected abstract void shutDownVisitors();
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    protected void shutDownExecutors() throws InterruptedException
+    {
+        executor.shutdownNow();
+        executor.awaitTermination(1, TimeUnit.MINUTES);
+    }
+    
+    protected abstract CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit);
 
     protected Runnable reportThrowable(Runnable runnable, CompletableFuture<?> future)
     {
-        return () -> {
+        return () ->
+        {
             try
             {
                 if (!future.isDone())
@@ -130,193 +156,313 @@ public abstract class Runner
             catch (Throwable t)
             {
                 errors.add(t);
+                
                 if (!future.isDone())
                     future.completeExceptionally(t);
             }
         };
     }
 
-    public static class SequentialRunner extends Runner
+    public interface RunnerFactory
+    {
+        Runner make(Run run, Configuration config);
+    }
+
+    public abstract static class TimedRunner extends Runner
+    {
+        public final long runtime;
+        public final TimeUnit runtimeUnit;
+        
+        protected final ScheduledExecutorService shutdownExecutor;
+
+        public TimedRunner(Run run, Configuration config, int concurrency, long runtime, TimeUnit runtimeUnit)
+        {
+            super(run, config, concurrency);
+
+            this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
+            this.runtime = runtime;
+            this.runtimeUnit = runtimeUnit;
+        }
+
+        protected ScheduledFuture<?> scheduleTermination(AtomicBoolean terminated)
+        {
+            return shutdownExecutor.schedule(() ->
+                                             {
+                                                 logger.info("Runner has reached configured runtime. Stopping...");
+                                                 // TODO: wait for the last full validation?
+                                                 terminated.set(true);
+                                             },
+                                             runtime,
+                                             runtimeUnit);
+        }
+
+        @Override
+        @SuppressWarnings("ResultOfMethodCallIgnored")
+        protected void shutDownExecutors() throws InterruptedException
+        {
+            shutdownExecutor.shutdownNow();
+            shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
+            executor.shutdownNow();
+            executor.awaitTermination(1, TimeUnit.MINUTES);
+        }
+    }
+
+    public static class SingleVisitRunner extends Runner
     {
-        private final ScheduledExecutorService executor;
-        private final ScheduledExecutorService shutdownExceutor;
         private final List<Visitor> visitors;
-        private final Configuration config;
 
-        public SequentialRunner(Run run,
+        public SingleVisitRunner(Run run,
                                 Configuration config,
                                 List<? extends Visitor.VisitorFactory> visitorFactories)
         {
-            super(run, config);
-
-            this.executor = Executors.newSingleThreadScheduledExecutor();
-            this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
-            this.config = config;
-            this.visitors = new ArrayList<>();
-            for (Visitor.VisitorFactory factory : visitorFactories)
-                visitors.add(factory.make(run));
+            super(run, config, 1);
+            this.visitors = visitorFactories.stream().map(factory -> factory.make(run)).collect(Collectors.toList());
         }
 
-        public CompletableFuture<?> initAndStartAll()
+        @Override
+        public String type()
+        {
+            return "single";
+        }
+
+        @Override
+        protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
         {
-            init();
             CompletableFuture<?> future = new CompletableFuture<>();
-            future.whenComplete((a, b) -> maybeReportErrors());
-
-            AtomicBoolean completed = new AtomicBoolean(false);
-            shutdownExceutor.schedule(() -> {
-                logger.info("Completed");
-                // TODO: wait for the last full validation?
-                completed.set(true);
-            }, config.run_time, config.run_time_unit);
-
-            executor.submit(reportThrowable(() -> {
-                                                try
-                                                {
-                                                    SequentialRunner.run(visitors, run.clock, future,
-                                                                         () -> Thread.currentThread().isInterrupted() || future.isDone() || completed.get());
-                                                }
-                                                catch (Throwable t)
-                                                {
-                                                    future.completeExceptionally(t);
-                                                }
-                                            },
-                                            future));
 
+            if (reportErrors)
+                future.whenComplete((a, b) -> maybeReportErrors());
+
+            executor.submit(reportThrowable(() -> run(visitors, future, parentExit), future));
             return future;
         }
 
-        static void run(List<Visitor> visitors,
-                        OpSelectors.MonotonicClock clock,
-                        CompletableFuture<?> future,
-                        BooleanSupplier exitCondition)
+        private void run(List<Visitor> visitors, CompletableFuture<?> future, BooleanSupplier parentExit)
         {
-            while (!exitCondition.getAsBoolean())
+            for (Visitor value: visitors)
             {
-                long lts = clock.nextLts();
+                if (parentExit.getAsBoolean())
+                    break;
+
+                value.visit();
+            }
+
+            future.complete(null);
+        }
+
+        @Override
+        public void shutdown() throws InterruptedException
+        {
+            logger.info("Shutting down...");
+            shutDownVisitors();
 
-                if (lts > 0 && lts % 10_000 == 0)
-                    logger.info("Visited {} logical timestamps", lts);
+            // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+            shutDownExecutors();
 
-                for (int i = 0; i < visitors.size() && !exitCondition.getAsBoolean(); i++)
+            teardown();
+        }
+
+        @Override
+        protected void shutDownVisitors()
+        {
+            shutDownVisitors(visitors);
+        }
+    }
+
+    public static class SequentialRunner extends TimedRunner
+    {
+        protected final List<Visitor> visitors;
+
+        public SequentialRunner(Run run,
+                                Configuration config,
+                                List<? extends Visitor.VisitorFactory> visitorFactories,
+                                long runtime, TimeUnit runtimeUnit)
+        {
+            super(run, config, 1, runtime, runtimeUnit);
+
+            this.visitors = visitorFactories.stream().map(factory -> factory.make(run)).collect(Collectors.toList());
+        }
+
+        @Override
+        public String type()
+        {
+            return "sequential";
+        }
+
+        @Override
+        protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+        {
+            CompletableFuture<?> future = new CompletableFuture<>();
+            
+            if (reportErrors)
+                future.whenComplete((a, b) -> maybeReportErrors());
+
+            AtomicBoolean terminated = new AtomicBoolean(false);
+            scheduleTermination(terminated);
+            BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone() 
+                                         || terminated.get() || parentExit.getAsBoolean();
+
+            executor.submit(reportThrowable(() -> run(visitors, future, exit), future));
+            return future;
+        }
+
+        protected void run(List<Visitor> visitors,
+                           CompletableFuture<?> future,
+                           BooleanSupplier exit)
+        {
+            while (!exit.getAsBoolean())
+            {
+                for (Visitor visitor: visitors)
                 {
-                    try
-                    {
-                        Visitor visitor = visitors.get(i);
-                        visitor.visit(lts);
-                    }
-                    catch (Throwable t)
-                    {
-                        future.completeExceptionally(t);
-                        throw t;
-                    }
+                    if (exit.getAsBoolean())
+                        break;
+                    visitor.visit();
                 }
             }
+
             future.complete(null);
         }
 
+        @Override
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            shutdownExceutor.shutdownNow();
-            shutdownExceutor.awaitTermination(1, TimeUnit.MINUTES);
+            shutDownVisitors();
 
-            executor.shutdownNow();
-            executor.awaitTermination(1, TimeUnit.MINUTES);
             // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+            shutDownExecutors();
+            
             teardown();
         }
-    }
 
-    public static interface RunnerFactory
-    {
-        public Runner make(Run run, Configuration config);
+        @Override
+        protected void shutDownVisitors()
+        {
+            shutDownVisitors(visitors);
+        }
     }
 
     // TODO: this requires some significant improvement
-    public static class ConcurrentRunner extends Runner
+    public static class ConcurrentRunner extends TimedRunner
     {
-        private final ScheduledExecutorService executor;
-        private final ScheduledExecutorService shutdownExecutor;
-        private final List<? extends Visitor.VisitorFactory> visitorFactories;
-        private final List<Visitor> allVisitors;
-
+        private final List<List<Visitor>> perThreadVisitors;
         private final int concurrency;
-        private final long runTime;
-        private final TimeUnit runTimeUnit;
 
         public ConcurrentRunner(Run run,
                                 Configuration config,
                                 int concurrency,
-                                List<? extends Visitor.VisitorFactory> visitorFactories)
+                                List<? extends Visitor.VisitorFactory> visitorFactories,
+                                long runtime, TimeUnit runtimeUnit)
         {
-            super(run, config);
-            this.concurrency = concurrency;
-            this.runTime = config.run_time;
-            this.runTimeUnit = config.run_time_unit;
-            // TODO: configure concurrency
-            this.executor = Executors.newScheduledThreadPool(concurrency);
-            this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
-            this.visitorFactories = visitorFactories;
-            this.allVisitors = new CopyOnWriteArrayList<>();
-        }
+            super(run, config, concurrency, runtime, runtimeUnit);
 
-        public CompletableFuture<?> initAndStartAll()
-        {
-            init();
-            CompletableFuture<?> future = new CompletableFuture<>();
-            future.whenComplete((a, b) -> maybeReportErrors());
-
-            shutdownExecutor.schedule(() -> {
-                logger.info("Completed");
-                // TODO: wait for the last full validation?
-                future.complete(null);
-            }, runTime, runTimeUnit);
+            this.concurrency = concurrency;
+            this.perThreadVisitors = new ArrayList<>(concurrency);
 
-            BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
             for (int i = 0; i < concurrency; i++)
             {
                 List<Visitor> visitors = new ArrayList<>();
-                executor.submit(reportThrowable(() -> {
-                                                    for (Visitor.VisitorFactory factory : visitorFactories)
-                                                        visitors.add(factory.make(run));
 
-                                                    allVisitors.addAll(visitors);
-                                                    run(visitors, run.clock, exitCondition);
-                                                },
-                                                future));
+                for (Visitor.VisitorFactory factory : visitorFactories)
+                    visitors.add(factory.make(run));
+
+                perThreadVisitors.add(visitors);
+            }
+        }
+
+        @Override
+        public String type()
+        {
+            return "concurrent";
+        }
 
+        @Override
+        protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+        {
+            CompletableFuture<?> future = new CompletableFuture<>();
+            
+            if (reportErrors)
+                future.whenComplete((a, b) -> maybeReportErrors());
+
+            AtomicBoolean terminated = new AtomicBoolean(false);
+            scheduleTermination(terminated);
+            BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone() 
+                                         || terminated.get() || parentExit.getAsBoolean();
+            
+            AtomicInteger liveCount = new AtomicInteger(0);
+            
+            for (int i = 0; i < concurrency; i++)
+            {
+                List<Visitor> visitors = perThreadVisitors.get(i);
+                executor.submit(reportThrowable(() -> run(visitors, future, exit, liveCount), future));
             }
 
             return future;
         }
 
-        void run(List<Visitor> visitors,
-                 OpSelectors.MonotonicClock clock,
-                 BooleanSupplier exitCondition)
+        private void run(List<Visitor> visitors,
+                         CompletableFuture<?> future,
+                         BooleanSupplier exit,
+                         AtomicInteger liveCount)
         {
-            while (!exitCondition.getAsBoolean())
+            liveCount.incrementAndGet();
+            
+            while (!exit.getAsBoolean())
             {
-                long lts = clock.nextLts();
                 for (Visitor visitor : visitors)
-                    visitor.visit(lts);
+                {
+                    if (exit.getAsBoolean())
+                        break;
+
+                    visitor.visit();
+                }
             }
+            
+            // If we're the last worker still running, complete the future....
+            if (liveCount.decrementAndGet() == 0)
+                future.complete(null);
         }
 
+        @Override
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            for (Visitor visitor : allVisitors)
-                visitor.shutdown();
-
-            shutdownExecutor.shutdownNow();
-            shutdownExecutor.awaitTermination(1, TimeUnit.MINUTES);
+            shutDownVisitors();
 
-            executor.shutdownNow();
-            executor.awaitTermination(1, TimeUnit.MINUTES);
             // we need to wait for all threads that use schema to stop before we can tear down and drop the table
+            shutDownExecutors();
+
             teardown();
         }
+
+        @Override
+        protected void shutDownVisitors()
+        {
+            shutDownVisitors(perThreadVisitors.stream().flatMap(Collection::stream).collect(Collectors.toList()));
+        }
+    }
+
+    protected static void shutDownVisitors(List<Visitor> visitors)
+    {
+        Throwable error = null;
+        
+        for (Visitor visitor : visitors)
+        {
+            try
+            {
+                visitor.shutdown();
+            }
+            catch (InterruptedException e)
+            {
+                if (error != null)
+                    error.addSuppressed(e);
+                else
+                    error = e;
+            }
+        }
+        
+        if (error != null)
+            logger.warn("Failed to shut down all visitors!", error);
     }
 
     private static void dumpExceptionToFile(BufferedWriter bw, Throwable throwable) throws IOException
@@ -361,7 +507,7 @@ public abstract class Runner
             File file = new File("run.yaml");
             Configuration.ConfigurationBuilder builder = config.unbuild();
 
-            // overrride stateful components
+            // override stateful components
             builder.setClock(run.clock.toConfig());
             builder.setDataTracker(run.tracker.toConfig());
 
@@ -373,12 +519,14 @@ public abstract class Runner
         }
         catch (Throwable e)
         {
-            logger.error("Caught an error while trying to dump to file",
-                         e);
+            logger.error("Caught an error while trying to dump to file", e);
             try
             {
                 File f = new File("tmp.dump");
-                f.createNewFile();
+                
+                if (!f.createNewFile())
+                    logger.info("File {} already exists. Appending...", f);
+                
                 BufferedWriter tmp = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
                 dumpExceptionToFile(tmp, e);
             }
diff --git a/harry-core/src/harry/runner/StagedRunner.java b/harry-core/src/harry/runner/StagedRunner.java
new file mode 100644
index 0000000..88ebcdb
--- /dev/null
+++ b/harry-core/src/harry/runner/StagedRunner.java
@@ -0,0 +1,201 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Configuration;
+import harry.core.Run;
+
+public class StagedRunner extends Runner.TimedRunner
+{
+    public static final String TYPE = "staged";
+
+    public static void register()
+    {
+        Configuration.registerSubtypes(StagedRunner.StagedRunnerConfig.class);
+    }
+    
+    private static final Logger logger = LoggerFactory.getLogger(StagedRunner.class);
+    
+    private final List<Configuration.RunnerConfiguration> runnerFactories;
+    private final List<Runner> stages;
+
+    public StagedRunner(Run run,
+                        Configuration config,
+                        List<Configuration.RunnerConfiguration> runnerFactories,
+                        long runtime, TimeUnit runtimeUnit)
+    {
+        super(run, config, 1, runtime, runtimeUnit);
+        this.runnerFactories = runnerFactories;
+        this.stages = new CopyOnWriteArrayList<>();
+    }
+
+    @Override
+    public String type() {
+        return TYPE;
+    }
+
+    @Override
+    protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+    {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (reportErrors)
+        {
+            future.whenComplete((a, b) ->
+            {
+                collectErrors();
+                maybeReportErrors();
+            });
+        }
+
+        AtomicBoolean terminated = new AtomicBoolean(false);
+        scheduleTermination(terminated);
+        
+        BooleanSupplier exit = () -> Thread.currentThread().isInterrupted() || future.isDone()
+                                     || terminated.get() || parentExit.getAsBoolean();
+
+        executor.submit(() ->
+        {
+            for (Configuration.RunnerConfiguration runnerConfig : runnerFactories)
+            {
+                try
+                {
+                    Runner runner = runnerConfig.make(run, config);
+                    if (runner instanceof StagedRunner)
+                        throw new IllegalArgumentException("StagedRunner can not be nested inside of a StagedRunner");
+                    stages.add(runner);
+                }
+                catch (Throwable t)
+                {
+                    future.completeExceptionally(t);
+                }
+            }
+
+            while (!exit.getAsBoolean())
+            {
+                logger.info("Starting next staged run...");
+                int stage = 1;
+
+                for (Runner runner : stages)
+                {
+                    if (exit.getAsBoolean())
+                        break;
+
+                    try
+                    {
+                        logger.info("Starting stage {}: {}...", stage++, runner.type());
+                        runner.start(false, exit).get();
+
+                        if (!terminated.get())
+                            logger.info("...stage complete.");
+
+                        // Wait for any previous runners to settle down...
+                        while (run.tracker.maxConsecutiveFinished() != run.tracker.maxStarted())
+                        {
+                            TimeUnit.SECONDS.sleep(1);
+                            logger.warn("Waiting for any previous runners to settle down: {}",
+                                        run.tracker.toString());
+                        }
+
+                    }
+                    catch (Throwable t)
+                    {
+                        future.completeExceptionally(t);
+                        break;
+                    }
+                }
+
+                if (!terminated.get())
+                    logger.info("All stages complete!");
+            }
+
+            future.complete(null);
+        });
+
+        return future;
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException
+    {
+        logger.info("Shutting down...");
+
+        for (Runner runner : stages)
+        {
+            runner.shutDownVisitors();
+            runner.shutDownExecutors();
+        }
+
+        shutDownExecutors();
+        teardown();
+    }
+
+    @Override
+    protected void shutDownVisitors()
+    {
+        // Visitors are shut down in the sub-runners.    
+    }
+
+    private void collectErrors()
+    {
+        for (Runner runner : stages)
+        {
+            errors.addAll(runner.errors);
+        }
+    }
+
+    @JsonTypeName(TYPE)
+    public static class StagedRunnerConfig implements Configuration.RunnerConfiguration
+    {
+        @JsonProperty(value = "stages")
+        public final List<Configuration.RunnerConfiguration> runnerFactories;
+
+        public final long run_time;
+        public final TimeUnit run_time_unit;
+
+        @JsonCreator
+        public StagedRunnerConfig(@JsonProperty(value = "stages") List<Configuration.RunnerConfiguration> stages,
+                                  @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                  @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+        {
+            this.runnerFactories = stages;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
+        }
+
+        @Override
+        public Runner make(Run run, Configuration config)
+        {
+            return new StagedRunner(run, config, runnerFactories, run_time, run_time_unit);
+        }
+    }
+}
diff --git a/harry-core/src/harry/runner/UpToLtsRunner.java b/harry-core/src/harry/runner/UpToLtsRunner.java
new file mode 100644
index 0000000..f89ecbb
--- /dev/null
+++ b/harry-core/src/harry/runner/UpToLtsRunner.java
@@ -0,0 +1,105 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.runner;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BooleanSupplier;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.visitors.Visitor;
+
+public class UpToLtsRunner extends Runner.SequentialRunner
+{
+    public static final String TYPE = "up_to_lts";
+
+    public static void register()
+    {
+        Configuration.registerSubtypes(UpToLtsRunnerConfig.class);
+    }
+
+    private final long maxLts;
+
+    public UpToLtsRunner(Run run,
+                         Configuration config, 
+                         List<? extends Visitor.VisitorFactory> visitorFactories, 
+                         long maxLts,
+                         long runtime, TimeUnit runtimeUnit)
+    {
+        super(run, config, visitorFactories, runtime, runtimeUnit);
+        this.maxLts = maxLts;
+    }
+
+    @Override
+    public String type()
+    {
+        return TYPE;
+    }
+
+    @Override
+    protected CompletableFuture<?> start(boolean reportErrors, BooleanSupplier parentExit)
+    {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        if (reportErrors)
+            future.whenComplete((a, b) -> maybeReportErrors());
+
+        AtomicBoolean terminated = new AtomicBoolean(false);
+        scheduleTermination(terminated);
+        BooleanSupplier exit = () -> run.clock.peek() >= maxLts
+                                     || Thread.currentThread().isInterrupted() || future.isDone()
+                                     || terminated.get() || parentExit.getAsBoolean();
+
+        executor.submit(reportThrowable(() -> run(visitors, future, exit), future));
+        return future;
+    }
+
+    @JsonTypeName(TYPE)
+    public static class UpToLtsRunnerConfig implements Configuration.RunnerConfiguration
+    {
+        public final List<Configuration.VisitorConfiguration> visitor_factories;
+        public final long max_lts;
+        public final long run_time;
+        public final TimeUnit run_time_unit;
+
+        @JsonCreator
+        public UpToLtsRunnerConfig(@JsonProperty(value = "visitors") List<Configuration.VisitorConfiguration> visitors,
+                                   @JsonProperty(value = "max_lts") long maxLts,
+                                   @JsonProperty(value = "run_time", defaultValue = "2") long runtime,
+                                   @JsonProperty(value = "run_time_unit", defaultValue = "HOURS") TimeUnit runtimeUnit)
+        {
+            this.visitor_factories = visitors;
+            this.max_lts = maxLts;
+            this.run_time = runtime;
+            this.run_time_unit = runtimeUnit;
+        }
+
+        @Override
+        public Runner make(Run run, Configuration config)
+        {
+            return new UpToLtsRunner(run, config, visitor_factories, max_lts, run_time, run_time_unit);
+        }
+    }
+}
diff --git a/harry-core/src/harry/util/ByteUtils.java b/harry-core/src/harry/util/ByteUtils.java
new file mode 100644
index 0000000..4adf536
--- /dev/null
+++ b/harry-core/src/harry/util/ByteUtils.java
@@ -0,0 +1,196 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.util;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+public class ByteUtils
+{
+    public static ByteBuffer bytes(String s)
+    {
+        return ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8));
+    }
+
+    public static ByteBuffer bytes(String s, Charset charset)
+    {
+        return ByteBuffer.wrap(s.getBytes(charset));
+    }
+
+    public static ByteBuffer bytes(byte b)
+    {
+        return ByteBuffer.allocate(1).put(0, b);
+    }
+
+    public static ByteBuffer bytes(short s)
+    {
+        return ByteBuffer.allocate(2).putShort(0, s);
+    }
+
+    public static ByteBuffer bytes(int i)
+    {
+        return ByteBuffer.allocate(4).putInt(0, i);
+    }
+
+    public static ByteBuffer bytes(long n)
+    {
+        return ByteBuffer.allocate(8).putLong(0, n);
+    }
+
+    public static ByteBuffer bytes(float f)
+    {
+        return ByteBuffer.allocate(4).putFloat(0, f);
+    }
+
+    public static ByteBuffer bytes(double d)
+    {
+        return ByteBuffer.allocate(8).putDouble(0, d);
+    }
+
+    public static ByteBuffer bytes(InetAddress address)
+    {
+        return ByteBuffer.wrap(address.getAddress());
+    }
+
+    public static ByteBuffer bytes(UUID uuid)
+    {
+        return ByteBuffer.wrap(decompose(uuid));
+    }
+
+    public static byte[] decompose(UUID uuid)
+    {
+        long most = uuid.getMostSignificantBits();
+        long least = uuid.getLeastSignificantBits();
+        byte[] b = new byte[16];
+        for (int i = 0; i < 8; i++)
+        {
+            b[i] = (byte)(most >>> ((7-i) * 8));
+            b[8+i] = (byte)(least >>> ((7-i) * 8));
+        }
+        return b;
+    }
+
+
+    public static ByteBuffer objectToBytes(Object obj)
+    {
+        if (obj instanceof Integer)
+            return bytes((int) obj);
+        else if (obj instanceof Byte)
+            return bytes((byte) obj);
+        else if (obj instanceof Boolean)
+            return bytes( (byte) (((boolean) obj) ? 1 : 0));
+        else if (obj instanceof Short)
+            return bytes((short) obj);
+        else if (obj instanceof Long)
+            return bytes((long) obj);
+        else if (obj instanceof Float)
+            return bytes((float) obj);
+        else if (obj instanceof Double)
+            return bytes((double) obj);
+        else if (obj instanceof UUID)
+            return bytes((UUID) obj);
+        else if (obj instanceof InetAddress)
+            return bytes((InetAddress) obj);
+        else if (obj instanceof String)
+            return bytes((String) obj);
+        else if (obj instanceof List)
+        {
+            throw new UnsupportedOperationException("Please use ByteUtils from integration package");
+        }
+        else if (obj instanceof Set)
+        {
+            throw new UnsupportedOperationException("Please use ByteUtils from integration package");
+        }
+        else if (obj instanceof ByteBuffer)
+            return (ByteBuffer) obj;
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+
+    public static ByteBuffer pack(Collection<ByteBuffer> buffers, int elements)
+    {
+        int size = 0;
+        for (ByteBuffer bb : buffers)
+            size += sizeOfValue(bb);
+
+        ByteBuffer result = ByteBuffer.allocate(sizeOfCollectionSize(elements) + size);
+        writeCollectionSize(result, elements);
+        for (ByteBuffer bb : buffers)
+            writeValue(result, bb);
+        result.flip();
+        return result;
+    }
+
+    public static int sizeOfValue(ByteBuffer value)
+    {
+        return value == null ? 4 : 4 + value.remaining();
+    }
+
+    protected static void writeCollectionSize(ByteBuffer output, int elements)
+    {
+        output.putInt(elements);
+    }
+    public static void writeValue(ByteBuffer output, ByteBuffer value)
+    {
+        if (value == null)
+        {
+            output.putInt(-1);
+            return;
+        }
+
+        output.putInt(value.remaining());
+        output.put(value.duplicate());
+    }
+
+    protected static int sizeOfCollectionSize(int elements)
+    {
+        return 4;
+    }
+
+    public static ByteBuffer compose(ByteBuffer... buffers) {
+        if (buffers.length == 1) return buffers[0];
+
+        int totalLength = 0;
+        for (ByteBuffer bb : buffers) totalLength += 2 + bb.remaining() + 1;
+
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer buffer : buffers) {
+            ByteBuffer bb = buffer.duplicate();
+            putShortLength(out, bb.remaining());
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
+    public static void putShortLength(ByteBuffer bb, int length) {
+        bb.put((byte) ((length >> 8) & 0xFF));
+        bb.put((byte) (length & 0xFF));
+    }
+
+}
diff --git a/harry-core/src/harry/visitors/AllPartitionsValidator.java b/harry-core/src/harry/visitors/AllPartitionsValidator.java
index 1b67a8e..7bd5b50 100644
--- a/harry-core/src/harry/visitors/AllPartitionsValidator.java
+++ b/harry-core/src/harry/visitors/AllPartitionsValidator.java
@@ -50,6 +50,10 @@ public class AllPartitionsValidator implements Visitor
     protected final MetricReporter metricReporter;
     protected final ExecutorService executor;
     protected final SystemUnderTest sut;
+
+    protected final AtomicBoolean condition;
+    protected final AtomicLong maxPos;
+
     protected final int concurrency;
     protected final int triggerAfter;
 
@@ -67,20 +71,27 @@ public class AllPartitionsValidator implements Visitor
         this.pdSelector = run.pdSelector;
         this.concurrency = concurrency;
         this.executor = Executors.newFixedThreadPool(concurrency);
+        this.condition = new AtomicBoolean();
+        this.maxPos = new AtomicLong(-1);
+        run.tracker.onLtsStarted((lts) -> {
+            maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
+            if (triggerAfter == 0 || (triggerAfter > 0 && lts % triggerAfter == 0))
+                condition.set(true);
+        });
     }
 
     protected CompletableFuture<Void> validateAllPartitions(ExecutorService executor, int parallelism)
     {
         final long maxPos = this.maxPos.get();
-        AtomicLong counter = new AtomicLong();
-        CompletableFuture[] futures = new CompletableFuture[parallelism];
+        AtomicLong cnt = new AtomicLong();
+        CompletableFuture<?>[] futures = new CompletableFuture[parallelism];
         AtomicBoolean isDone = new AtomicBoolean(false);
 
         for (int i = 0; i < parallelism; i++)
         {
             futures[i] = CompletableFuture.supplyAsync(() -> {
                 long pos;
-                while ((pos = counter.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted() && !isDone.get())
+                while ((pos = cnt.getAndIncrement()) < maxPos && !executor.isShutdown() && !Thread.interrupted() && !isDone.get())
                 {
                     if (pos > 0 && pos % 100 == 0)
                         logger.info(String.format("Validated %d out of %d partitions", pos, maxPos));
@@ -109,27 +120,30 @@ public class AllPartitionsValidator implements Visitor
         return CompletableFuture.allOf(futures);
     }
 
-    private final AtomicLong maxPos = new AtomicLong(-1);
-
-    public void visit(long lts)
+    public void visit()
     {
-        maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
-
-        if (triggerAfter > 0 && lts % triggerAfter == 0)
+        // TODO: this is ok for now, but if/when we bring exhaustive checker back, we need to change this:
+        // we'll probably want a low-concurrency validator somewhere in background all the time.
+        if (condition.compareAndSet(true, false))
         {
-            logger.info("Starting validations of all {} partitions", maxPos.get());
+            long lts = clock.peek();
+            logger.info("Starting validation of all partitions as of lts {}...", lts);
+
             try
             {
                 validateAllPartitions(executor, concurrency).get();
             }
             catch (Throwable e)
             {
-                throw new RuntimeException(e);
+                // TODO: Make a utility out of this.
+                throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
             }
-            logger.info("Finished validations of all partitions");
+            logger.info("...finished validating all partitions as of lts {}.", lts);
         }
     }
 
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @Override
     public void shutdown() throws InterruptedException
     {
         executor.shutdown();
diff --git a/harry-core/src/harry/visitors/CorruptingVisitor.java b/harry-core/src/harry/visitors/CorruptingVisitor.java
index 26febeb..4677ea2 100644
--- a/harry-core/src/harry/visitors/CorruptingVisitor.java
+++ b/harry-core/src/harry/visitors/CorruptingVisitor.java
@@ -64,8 +64,9 @@ public class CorruptingVisitor implements Visitor
 
     private final AtomicLong maxPos = new AtomicLong(-1);
 
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = run.clock.peek();
         maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
         if (lts == 0 || lts % triggerAfter != 0)
@@ -87,8 +88,4 @@ public class CorruptingVisitor implements Visitor
             logger.error("Caught an exception while trying to corrupt a partition.", t);
         }
     }
-
-    public void shutdown() throws InterruptedException
-    {
-    }
 }
diff --git a/harry-core/src/harry/visitors/DelegatingVisitor.java b/harry-core/src/harry/visitors/DelegatingVisitor.java
deleted file mode 100644
index f4994b1..0000000
--- a/harry-core/src/harry/visitors/DelegatingVisitor.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.visitors;
-
-import harry.model.OpSelectors;
-
-public abstract class DelegatingVisitor implements Visitor, VisitExecutor
-{
-    protected VisitExecutor delegate;
-
-    public DelegatingVisitor(VisitExecutor delegate)
-    {
-        this.delegate = delegate;
-    }
-
-    public void beforeLts(long lts, long pd)
-    {
-        delegate.beforeLts(lts, pd);
-    }
-
-    public void afterLts(long lts, long pd)
-    {
-        delegate.afterLts(lts, pd);
-    }
-
-    public void beforeBatch(long lts, long pd, long m)
-    {
-        delegate.beforeBatch(lts, pd, m);
-    }
-
-    public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
-    {
-        delegate.operation(lts, pd, cd, m, opId, opType);
-    }
-
-    public void afterBatch(long lts, long pd, long m)
-    {
-        delegate.afterBatch(lts, pd, m);
-    }
-
-    public void shutdown() throws InterruptedException
-    {
-        delegate.shutdown();
-    }
-}
diff --git a/harry-core/src/harry/visitors/GeneratingVisitor.java b/harry-core/src/harry/visitors/GeneratingVisitor.java
index 78e2b07..624330f 100644
--- a/harry-core/src/harry/visitors/GeneratingVisitor.java
+++ b/harry-core/src/harry/visitors/GeneratingVisitor.java
@@ -22,7 +22,7 @@ import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
 
-public class GeneratingVisitor extends DelegatingVisitor
+public class GeneratingVisitor extends LtsVisitor
 {
     private final OpSelectors.PdSelector pdSelector;
     private final OpSelectors.DescriptorSelector descriptorSelector;
@@ -31,7 +31,8 @@ public class GeneratingVisitor extends DelegatingVisitor
     public GeneratingVisitor(Run run,
                              VisitExecutor delegate)
     {
-        super(delegate);
+        super(delegate, run.clock::nextLts);
+
         this.pdSelector = run.pdSelector;
         this.descriptorSelector = run.descriptorSelector;
         this.schema = run.schemaSpec;
@@ -50,10 +51,10 @@ public class GeneratingVisitor extends DelegatingVisitor
         int modificationsCount = descriptorSelector.numberOfModifications(lts);
         int opsPerModification = descriptorSelector.opsPerModification(lts);
 
-        for (int m = 0; m < modificationsCount; m++)
+        for (long m = 0; m < modificationsCount; m++)
         {
             beforeBatch(lts, pd, m);
-            for (int i = 0; i < opsPerModification; i++)
+            for (long i = 0; i < opsPerModification; i++)
             {
                 long opId = m * opsPerModification + i;
                 long cd = descriptorSelector.cd(pd, lts, opId, schema);
diff --git a/harry-core/src/harry/visitors/LoggingVisitor.java b/harry-core/src/harry/visitors/LoggingVisitor.java
index 8deebde..35f9aa0 100644
--- a/harry-core/src/harry/visitors/LoggingVisitor.java
+++ b/harry-core/src/harry/visitors/LoggingVisitor.java
@@ -31,7 +31,6 @@ import harry.operations.CompiledStatement;
 
 public class LoggingVisitor extends GeneratingVisitor
 {
-
     public LoggingVisitor(Run run,
                           OperationExecutor.RowVisitorFactory rowVisitorFactory)
     {
diff --git a/harry-core/src/harry/visitors/LtsVisitor.java b/harry-core/src/harry/visitors/LtsVisitor.java
new file mode 100644
index 0000000..5b37db0
--- /dev/null
+++ b/harry-core/src/harry/visitors/LtsVisitor.java
@@ -0,0 +1,97 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.visitors;
+
+import java.util.function.LongSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.model.OpSelectors;
+
+/**
+ * Common class for all visitors that support visits at a specific logical timestamp.
+ *
+ * Classes inheriting from LTS Visitor have to visit drawn LTS: every LTS that has been received
+ * from the consumer _has to_ actually be visited. If this is not done, during model checking
+ * drawn LTS will be considered visited, which will lead to data inconsistencies.
+ *
+ * This class and its implementations such as Mutating visitor are NOT thread safe. If you'd like
+ * to have several threads generating data, use multiple copies of delegating visitor, since
+ */
+public abstract class LtsVisitor extends VisitExecutor implements Visitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(LtsVisitor.class);
+    protected final VisitExecutor delegate;
+    private final LongSupplier ltsSource;
+
+    public LtsVisitor(VisitExecutor delegate,
+                      LongSupplier ltsSource)
+    {
+        this.delegate = delegate;
+        this.ltsSource = ltsSource;
+    }
+
+    public final void visit()
+    {
+        long lts = ltsSource.getAsLong();
+        if (lts > 0 && lts % 10_000 == 0)
+            logger.info("Visiting lts {}...", lts);
+        visit(lts);
+    }
+
+    public abstract void visit(long lts);
+
+    @Override
+    protected void beforeLts(long lts, long pd)
+    {
+        delegate.beforeLts(lts, pd);
+    }
+
+    @Override
+    protected void afterLts(long lts, long pd)
+    {
+        delegate.afterLts(lts, pd);
+    }
+
+    @Override
+    protected void beforeBatch(long lts, long pd, long m)
+    {
+        delegate.beforeBatch(lts, pd, m);
+    }
+
+    @Override
+    protected void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+    {
+        delegate.operation(lts, pd, cd, m, opId, opType);
+    }
+
+    @Override
+    protected void afterBatch(long lts, long pd, long m)
+    {
+        delegate.afterBatch(lts, pd, m);
+    }
+
+    @Override
+    public void shutdown() throws InterruptedException
+    {
+        delegate.shutdown();
+    }
+
+}
diff --git a/harry-core/src/harry/visitors/MutatingVisitor.java b/harry-core/src/harry/visitors/MutatingVisitor.java
index 26c7417..d6eb617 100644
--- a/harry-core/src/harry/visitors/MutatingVisitor.java
+++ b/harry-core/src/harry/visitors/MutatingVisitor.java
@@ -19,6 +19,7 @@
 package harry.visitors;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -41,10 +42,16 @@ public class MutatingVisitor extends GeneratingVisitor
     public MutatingVisitor(Run run,
                            OperationExecutor.RowVisitorFactory rowVisitorFactory)
     {
-        super(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
+        this(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
     }
 
-    public static class MutatingVisitExecutor implements VisitExecutor
+    public MutatingVisitor(Run run,
+                           VisitExecutor visitExecutor)
+    {
+        super(run, visitExecutor);
+    }
+
+    public static class MutatingVisitExecutor extends VisitExecutor
     {
         private final List<String> statements = new ArrayList<>();
         private final List<Object> bindings = new ArrayList<>();
@@ -104,8 +111,7 @@ public class MutatingVisitor extends GeneratingVisitor
             CompiledStatement statement = operationInternal(lts, pd, cd, m, opId, opType);
 
             statements.add(statement.cql());
-            for (Object binding : statement.bindings())
-                bindings.add(binding);
+            Collections.addAll(bindings, statement.bindings());
         }
 
         protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
@@ -140,10 +146,10 @@ public class MutatingVisitor extends GeneratingVisitor
 
         protected void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement)
         {
-            executeAsyncWithRetries(lts, pd, future, statement, 0);
+            executeAsyncWithRetries(future, statement, 0);
         }
 
-        private void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
+        private void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
         {
             if (sut.isShutdown())
                 throw new IllegalStateException("System under test is shut down");
@@ -154,8 +160,10 @@ public class MutatingVisitor extends GeneratingVisitor
             sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
                .whenComplete((res, t) -> {
                    if (t != null)
-                       executor.schedule(() -> executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, TimeUnit.SECONDS);
-                   else
+                   {
+                       logger.error("Caught message while trying to execute " +  statement, t);
+                       executor.schedule(() -> executeAsyncWithRetries(future, statement, retries + 1), 1, TimeUnit.SECONDS);
+                   }else
                        future.complete(res);
                });
         }
diff --git a/harry-core/src/harry/visitors/ParallelValidator.java b/harry-core/src/harry/visitors/ParallelValidator.java
index 4772b40..e96bd39 100644
--- a/harry-core/src/harry/visitors/ParallelValidator.java
+++ b/harry-core/src/harry/visitors/ParallelValidator.java
@@ -86,8 +86,9 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
         }
     }
 
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = run.clock.peek();
         maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
         if (triggerAfter > 0 && lts % triggerAfter == 0)
@@ -104,6 +105,7 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
         }
     }
 
+    @Override
     public void shutdown() throws InterruptedException
     {
         executor.shutdown();
diff --git a/harry-core/src/harry/visitors/RecentValidator.java b/harry-core/src/harry/visitors/RecentValidator.java
index d0d09fa..a563a30 100644
--- a/harry-core/src/harry/visitors/RecentValidator.java
+++ b/harry-core/src/harry/visitors/RecentValidator.java
@@ -24,6 +24,9 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -46,8 +49,12 @@ public class RecentValidator implements Visitor
     private final OpSelectors.PdSelector pdSelector;
     private final QueryGenerator.TypedQueryGenerator querySelector;
     private final MetricReporter metricReporter;
+    private final OpSelectors.MonotonicClock clock;
+
+    private final AtomicBoolean condition;
+    private final AtomicLong maxPos;
+
     private final int partitionCount;
-    private final int triggerAfter;
     private final int queries;
 
     public RecentValidator(int partitionCount,
@@ -57,11 +64,19 @@ public class RecentValidator implements Visitor
                            Model.ModelFactory modelFactory)
     {
         this.partitionCount = partitionCount;
-        this.triggerAfter = triggerAfter;
         this.queries = queries;
         this.metricReporter = run.metricReporter;
         this.pdSelector = run.pdSelector;
+        this.clock = run.clock;
 
+        this.condition = new AtomicBoolean();
+        this.maxPos = new AtomicLong(-1);
+
+        run.tracker.onLtsStarted((lts) -> {
+            maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
+            if (triggerAfter == 0 || (triggerAfter > 0 && lts % triggerAfter == 0))
+                condition.set(true);
+        });
         this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
                                                                     // TODO: make query kind configurable
                                                                     Surjections.enumValues(Query.QueryKind.class),
@@ -78,10 +93,8 @@ public class RecentValidator implements Visitor
         }
     }
 
-    private final AtomicLong maxPos = new AtomicLong(-1);
-
     // TODO: expose metric, how many times validated recent partitions
-    private void validateRecentPartitions(long lts)
+    private int validateRecentPartitions()
     {
         long pos = maxPos.get();
 
@@ -94,35 +107,34 @@ public class RecentValidator implements Visitor
                 metricReporter.validateRandomQuery();
                 Query query = querySelector.inflate(visitLts, i);
                 // TODO: add pd skipping from shrinker here, too
-                log(lts, i, query);
+                log(i, query);
                 model.validate(query);
             }
 
             pos--;
             maxPartitions--;
         }
+        
+        return partitionCount - maxPartitions;
     }
 
-    public void visit(long lts)
+    @Override
+    public void visit()
     {
-        maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
-
-        if (triggerAfter > 0 && lts % triggerAfter == 0)
+        if (condition.compareAndSet(true, false))
         {
-            logger.info("Validating {} recent partitions", partitionCount);
-            validateRecentPartitions(lts);
+            long lts = clock.peek();
+            logger.info("Validating (up to) {} recent partitions as of lts {}...", partitionCount, lts);
+            int count = validateRecentPartitions();
+            logger.info("...finished validating {} recent partitions as of lts {}.", count, lts);
         }
     }
 
-    public void shutdown() throws InterruptedException
-    {
-    }
-
-    private void log(long lts, int modifier, Query query)
+    private void log(int modifier, Query query)
     {
         try
         {
-            validationLog.write("LTS: " + lts + ". Modifier: " + modifier + ". PD: " + query.pd);
+            validationLog.write(String.format("PD: %d. Modifier: %d.", query.pd, modifier));
             validationLog.write("\t");
             validationLog.write(query.toSelectStatement().toString());
             validationLog.write("\n");
diff --git a/harry-core/src/harry/visitors/ReplayingVisitor.java b/harry-core/src/harry/visitors/ReplayingVisitor.java
index 1979664..bd3e1c2 100644
--- a/harry-core/src/harry/visitors/ReplayingVisitor.java
+++ b/harry-core/src/harry/visitors/ReplayingVisitor.java
@@ -19,17 +19,19 @@
 package harry.visitors;
 
 import java.util.Arrays;
+import java.util.function.LongSupplier;
 
 import harry.core.Run;
 import harry.model.OpSelectors;
 
-public abstract class ReplayingVisitor extends DelegatingVisitor
+public abstract class ReplayingVisitor extends LtsVisitor
 {
-    public ReplayingVisitor(VisitExecutor delegate)
+    public ReplayingVisitor(VisitExecutor delegate, LongSupplier ltsSource)
     {
-        super(delegate);
+        super(delegate, ltsSource);
     }
 
+    @Override
     public void visit(long lts)
     {
         replay(getVisit(lts));
@@ -37,7 +39,8 @@ public abstract class ReplayingVisitor extends DelegatingVisitor
 
     public abstract Visit getVisit(long lts);
 
-    public abstract void replayAll(Run run);
+    public abstract void replayAll();
+
     private void replay(Visit visit)
     {
         beforeLts(visit.lts, visit.pd);
diff --git a/harry-core/src/harry/visitors/Sampler.java b/harry-core/src/harry/visitors/Sampler.java
index 20c9a8c..fe73089 100644
--- a/harry-core/src/harry/visitors/Sampler.java
+++ b/harry-core/src/harry/visitors/Sampler.java
@@ -43,6 +43,7 @@ public class Sampler implements Visitor
     // TODO: move maxPos to data tracker since we seem to use quite a lot?
     private final AtomicLong maxPos = new AtomicLong(-1);
     private final OpSelectors.PdSelector pdSelector;
+    private final OpSelectors.MonotonicClock clock;
     private final SchemaSpec schema;
     private final int triggerAfter;
     private final int samplePartitions;
@@ -52,16 +53,18 @@ public class Sampler implements Visitor
     {
         this.sut = run.sut;
         this.pdSelector = run.pdSelector;
+        this.clock = run.clock;
         this.schema = run.schemaSpec;
         this.triggerAfter = triggerAfter;
         this.samplePartitions = samplePartitions;
     }
 
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = clock.peek();
         maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
 
-        if (triggerAfter > 0 && lts % triggerAfter == 0)
+        if (triggerAfter == 0 || triggerAfter > 0 && lts % triggerAfter == 0)
         {
             long max = maxPos.get();
             DescriptiveStatistics ds = new DescriptiveStatistics();
@@ -84,10 +87,6 @@ public class Sampler implements Visitor
         }
     }
 
-    public void shutdown() throws InterruptedException
-    {
-    }
-
     @JsonTypeName("sampler")
     public static class SamplerConfiguration implements Configuration.VisitorConfiguration
     {
diff --git a/harry-core/src/harry/visitors/SingleValidator.java b/harry-core/src/harry/visitors/SingleValidator.java
index 0993461..85a378a 100644
--- a/harry-core/src/harry/visitors/SingleValidator.java
+++ b/harry-core/src/harry/visitors/SingleValidator.java
@@ -29,6 +29,7 @@ public class SingleValidator implements Visitor
     protected final Model model;
     protected final QueryGenerator queryGenerator;
     protected final Run run;
+
     public SingleValidator(int iterations,
                            Run run,
                            Model.ModelFactory modelFactory)
@@ -39,9 +40,10 @@ public class SingleValidator implements Visitor
         this.run = run;
     }
 
-    public void shutdown() throws InterruptedException
+    @Override
+    public void visit()
     {
-
+        visit(run.clock.peek());
     }
 
     public void visit(long lts)
diff --git a/harry-core/src/harry/visitors/VisitExecutor.java b/harry-core/src/harry/visitors/VisitExecutor.java
index 94aa1fc..63efba0 100644
--- a/harry-core/src/harry/visitors/VisitExecutor.java
+++ b/harry-core/src/harry/visitors/VisitExecutor.java
@@ -20,17 +20,17 @@ package harry.visitors;
 
 import harry.model.OpSelectors;
 
-public interface VisitExecutor
+public abstract class VisitExecutor
 {
-    public void beforeLts(long lts, long pd);
+    protected abstract void beforeLts(long lts, long pd);
 
-    public void afterLts(long lts, long pd);
+    protected abstract void afterLts(long lts, long pd);
 
-    public void beforeBatch(long lts, long pd, long m);
+    protected abstract void beforeBatch(long lts, long pd, long m);
 
-    public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
+    protected abstract void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
 
-    public void afterBatch(long lts, long pd, long m);
+    protected abstract void afterBatch(long lts, long pd, long m);
 
-    public default void shutdown() throws InterruptedException {}
+    public abstract void shutdown() throws InterruptedException;
 }
diff --git a/harry-core/src/harry/visitors/Visitor.java b/harry-core/src/harry/visitors/Visitor.java
index 7cb6111..b8baf10 100644
--- a/harry-core/src/harry/visitors/Visitor.java
+++ b/harry-core/src/harry/visitors/Visitor.java
@@ -22,12 +22,12 @@ import harry.core.Run;
 
 public interface Visitor
 {
-    void visit(long lts);
+    void visit();
 
-    public default void shutdown() throws InterruptedException {}
+    default void shutdown() throws InterruptedException {}
 
-    public interface VisitorFactory
+    interface VisitorFactory
     {
-        public Visitor make(Run run);
+        Visitor make(Run run);
     }
 }
\ No newline at end of file
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 0f474f0..e2ebd82 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -44,8 +44,8 @@ import harry.model.clock.OffsetClock;
 import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.runner.DataTracker;
+import harry.visitors.LtsVisitor;
 import harry.visitors.MutatingVisitor;
-import harry.visitors.Visitor;
 import harry.visitors.OperationExecutor;
 import harry.util.BitSet;
 
@@ -205,82 +205,80 @@ public class OpSelectorsTest
         };
 
         Run run = new Run(rng,
-                new OffsetClock(0),
-                pdSelector,
-                ckSelector,
-                schema,
-                DataTracker.NO_OP,
-                SystemUnderTest.NO_OP,
-                MetricReporter.NO_OP);
-
-        Visitor visitor = new MutatingVisitor(run,
-                                              (r) -> new OperationExecutor()
-                                                                         {
-                                                                             public CompiledStatement insert(long lts, long pd, long cd, long m)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement update(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deletePartition(long lts, long pd, long opId)
-                                                                             {
-                                                                                 // ignore
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
-                                                                             {
-                                                                                 consumer.accept(pd, cd);
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteRange(long lts, long pd, long opId)
-                                                                             {
-                                                                                 // ignore
-                                                                                 return compiledStatement;
-                                                                             }
-
-                                                                             public CompiledStatement deleteSlice(long lts, long pd, long opId)
-                                                                             {
-                                                                                 // ignore
-                                                                                 return compiledStatement;
-                                                                             }
-                                                                         });
+                          new OffsetClock(0),
+                          pdSelector,
+                          ckSelector,
+                          schema,
+                          DataTracker.NO_OP,
+                          SystemUnderTest.NO_OP,
+                          MetricReporter.NO_OP);
+
+        LtsVisitor visitor = new MutatingVisitor(run,
+                                                 (r) -> new OperationExecutor()
+                                                        {
+                                                            public CompiledStatement insert(long lts, long pd, long cd, long m)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement update(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteColumn(long lts, long pd, long cd, long m)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteRow(long lts, long pd, long cd, long m)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deletePartition(long lts, long pd, long opId)
+                                                            {
+                                                                // ignore
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
+                                                            {
+                                                                consumer.accept(pd, cd);
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteRange(long lts, long pd, long opId)
+                                                            {
+                                                                // ignore
+                                                                return compiledStatement;
+                                                            }
+
+                                                            public CompiledStatement deleteSlice(long lts, long pd, long opId)
+                                                            {
+                                                                // ignore
+                                                                return compiledStatement;
+                                                            }
+                                                        });
 
         for (int lts = 0; lts < 1000; lts++)
-        {
-            visitor.visit(lts);
-        }
+            visitor.visit();
 
         for (Collection<Long> value : partitionMap.values())
             Assert.assertEquals(10, value.size());
diff --git a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
index 702163e..621d579 100644
--- a/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
+++ b/harry-integration-external/src/harry/runner/external/HarryRunnerExternal.java
@@ -38,7 +38,7 @@ public class HarryRunnerExternal extends HarryRunner {
     }
 
     @Override
-    public void beforeRun(Runner runner) {
+    public void beforeRun(Runner.TimedRunner runner) {
 
     }
 }
diff --git a/harry-integration/src/harry/model/sut/ByteUtils.java b/harry-integration/src/harry/model/sut/ByteUtils.java
new file mode 100644
index 0000000..ee7721c
--- /dev/null
+++ b/harry-integration/src/harry/model/sut/ByteUtils.java
@@ -0,0 +1,115 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model.sut;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.AsciiType;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.db.marshal.DoubleType;
+import org.apache.cassandra.db.marshal.FloatType;
+import org.apache.cassandra.db.marshal.InetAddressType;
+import org.apache.cassandra.db.marshal.IntegerType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.ShortType;
+import org.apache.cassandra.db.marshal.UUIDType;
+
+public class ByteUtils extends harry.util.ByteUtils
+{
+    public static ByteBuffer objectToBytes(Object obj)
+    {
+        if (obj instanceof Integer)
+            return bytes((int) obj);
+        else if (obj instanceof Byte)
+            return bytes((byte) obj);
+        else if (obj instanceof Boolean)
+            return bytes( (byte) (((boolean) obj) ? 1 : 0));
+        else if (obj instanceof Short)
+            return bytes((short) obj);
+        else if (obj instanceof Long)
+            return bytes((long) obj);
+        else if (obj instanceof Float)
+            return bytes((float) obj);
+        else if (obj instanceof Double)
+            return bytes((double) obj);
+        else if (obj instanceof UUID)
+            return bytes((UUID) obj);
+        else if (obj instanceof InetAddress)
+            return bytes((InetAddress) obj);
+        else if (obj instanceof String)
+            return bytes((String) obj);
+        else if (obj instanceof List)
+        {
+            List l = (List) obj;
+            if (l.isEmpty())
+                return pack(Collections.emptyList(), 0);
+            return ListType.getInstance(getType(l.get(0)), true).decompose(l);
+        }
+        else if (obj instanceof Set)
+        {
+            Set l = (Set) obj;
+            if (l.isEmpty())
+                return pack(Collections.emptyList(), 0);
+            return SetType.getInstance(getType(l.iterator().next()), true).decompose(l);
+        }
+        else if (obj instanceof ByteBuffer)
+            return (ByteBuffer) obj;
+
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+
+    public static AbstractType<?> getType(Object obj)
+    {
+        if (obj instanceof Integer)
+            return IntegerType.instance; //TODO
+        else if (obj instanceof Byte)
+            return ByteType.instance;
+        else if (obj instanceof Boolean)
+            return BooleanType.instance;
+        else if (obj instanceof Short)
+            return ShortType.instance;
+        else if (obj instanceof Long)
+            return IntegerType.instance;
+        else if (obj instanceof Float)
+            return FloatType.instance;
+        else if (obj instanceof Double)
+            return DoubleType.instance;
+        else if (obj instanceof UUID)
+            return UUIDType.instance;
+        else if (obj instanceof InetAddress)
+            return InetAddressType.instance;
+        else if (obj instanceof String)
+            return AsciiType.instance;
+        else
+            throw new IllegalArgumentException(String.format("Cannot convert value %s of type %s",
+                                                             obj,
+                                                             obj.getClass()));
+    }
+}
diff --git a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
index ff4ad86..6bc0705 100644
--- a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
+++ b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import harry.core.Configuration;
 import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.operations.CompiledStatement;
@@ -38,7 +37,7 @@ public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorE
 {
     public static void init()
     {
-        Configuration.registerSubtypes(Configuation.class);
+        harry.core.Configuration.registerSubtypes(Configuration.class);
     }
 
     private final InJvmSut sut;
@@ -92,14 +91,14 @@ public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorE
     }
 
     @JsonTypeName("in_jvm_token_aware")
-    public static class Configuation implements Configuration.VisitorConfiguration
+    public static class Configuration implements harry.core.Configuration.VisitorConfiguration
     {
-        public final Configuration.RowVisitorConfiguration row_visitor;
+        public final harry.core.Configuration.RowVisitorConfiguration row_visitor;
         public final SystemUnderTest.ConsistencyLevel consistency_level;
 
         @JsonCreator
-        public Configuation(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration rowVisitor,
-                            @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
+        public Configuration(@JsonProperty("row_visitor") harry.core.Configuration.RowVisitorConfiguration rowVisitor,
+                             @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
         {
             this.row_visitor = rowVisitor;
             this.consistency_level = consistencyLevel;
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index f758ca5..687a5b3 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -20,27 +20,19 @@ package harry.model.sut;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import harry.core.Configuration;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.marshal.ByteBufferAccessor;
-import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
 {
@@ -49,8 +41,6 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
         Configuration.registerSubtypes(InJvmSutConfiguration.class);
     }
 
-    private static final Logger logger = LoggerFactory.getLogger(InJvmSut.class);
-
     public InJvmSut(Cluster cluster)
     {
         super(cluster, 10);
@@ -97,7 +87,7 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
     {
         return cluster.get(1).appliesOnInstance((Object[] pk, String ks) ->
                                                 {
-                                                    String pkString = Arrays.asList(pk).stream().map(Object::toString).collect(Collectors.joining(":"));
+                                                    String pkString = Arrays.stream(pk).map(Object::toString).collect(Collectors.joining(":"));
                                                     EndpointsForToken endpoints = StorageService.instance.getNaturalReplicasForToken(ks, table, pkString);
                                                     int[] nodes = new int[endpoints.size()];
                                                     for (int i = 0; i < endpoints.size(); i++)
@@ -105,4 +95,4 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
                                                     return nodes;
                                                 }).apply(partitionKey, keyspace);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/src/harry/runner/HarryRunnerJvm.java b/harry-integration/src/harry/runner/HarryRunnerJvm.java
index fbb30b9..ed4ea46 100644
--- a/harry-integration/src/harry/runner/HarryRunnerJvm.java
+++ b/harry-integration/src/harry/runner/HarryRunnerJvm.java
@@ -37,7 +37,7 @@ public class HarryRunnerJvm extends HarryRunner {
 
 
     @Override
-    public void beforeRun(Runner runner) {
+    public void beforeRun(Runner.TimedRunner runner) {
 
     }
 }
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
index fe811e5..71c1535 100644
--- a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -19,7 +19,6 @@
 package harry.runner;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -29,6 +28,7 @@ import harry.core.Configuration;
 import harry.core.Run;
 import harry.data.ResultSetRow;
 import harry.model.Model;
+import harry.model.OpSelectors;
 import harry.model.QuiescentChecker;
 import harry.model.sut.InJvmSut;
 import harry.model.sut.SystemUnderTest;
@@ -47,27 +47,26 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
                                        QuiescentCheckerConfig.class);
     }
 
-    public final InJvmSut inJvmSut;
-
+    private final InJvmSut inJvmSut;
+    private final OpSelectors.MonotonicClock clock;
     public RepairingLocalStateValidator(int concurrency, int triggerAfter, Run run, Model.ModelFactory modelFactory)
     {
         super(concurrency, triggerAfter, run, modelFactory);
-
         this.inJvmSut = (InJvmSut) run.sut;
+        this.clock = run.clock;
     }
 
-    @Override
-    public void visit(long lts)
+    public void visit()
     {
+        long lts = clock.peek();
         if (lts > 0 && lts % triggerAfter == 0)
         {
             System.out.println("Starting repair...");
 
-            inJvmSut.cluster().stream().forEach((instance) -> {
-                instance.nodetool("repair", "--full");
-            });
+            inJvmSut.cluster().stream().forEach((instance) -> instance.nodetool("repair", "--full"));
+
             System.out.println("Validating partitions...");
-            super.visit(lts);
+            super.visit();
         }
     }
 
@@ -110,7 +109,7 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
         public void validate(Query query)
         {
             CompiledStatement compiled = query.toSelectStatement();
-            int[] replicas = inJvmSut.getReplicasFor(schemaSpec.inflatePartitionKey(query.pd), schemaSpec.keyspace, schemaSpec.table);
+            int[] replicas = inJvmSut.getReplicasFor(schema.inflatePartitionKey(query.pd), schema.keyspace, schema.table);
             for (int node : replicas)
             {
                 validate(() -> {
diff --git a/harry-integration/src/harry/runner/TrivialShrinker.java b/harry-integration/src/harry/runner/TrivialShrinker.java
index 2879e0e..b598b85 100644
--- a/harry-integration/src/harry/runner/TrivialShrinker.java
+++ b/harry-integration/src/harry/runner/TrivialShrinker.java
@@ -24,13 +24,14 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.visitors.DelegatingVisitor;
-import harry.visitors.Visitor;
+import harry.visitors.LtsVisitor;
 import harry.visitors.SkippingVisitor;
+import harry.visitors.Visitor;
 
 /**
  * A most trivial imaginable shrinker: attempts to skip partitions and/or logical timestamps to see if the
@@ -63,15 +64,17 @@ public class TrivialShrinker
             Run run = configuration.createRun();
             Configuration.SequentialRunnerConfig config = (Configuration.SequentialRunnerConfig) configuration.runner;
             List<Visitor> visitors = new ArrayList<>();
-            for (Configuration.VisitorConfiguration factory : config.visitor_factories)
+            for (Configuration.VisitorConfiguration factory : config.visitorFactories)
             {
                 Visitor visitor = factory.make(run);
-                if (visitor instanceof DelegatingVisitor)
+                if (visitor instanceof LtsVisitor)
                 {
-                    visitors.add(new SkippingVisitor(visitor,
+                    AtomicLong counter = new AtomicLong();
+                    visitors.add(new SkippingVisitor((LtsVisitor) visitor,
+                                                     counter::getAndIncrement,
                                                      (lts) -> run.pdSelector.pd(lts, run.schemaSpec),
                                                      ltsToSkip,
-                                                     pdsToSkip));
+                                                     pdsToSkip)) ;
                 }
                 else
                 {
@@ -93,7 +96,7 @@ public class TrivialShrinker
 
                 try
                 {
-                    runOnce(run, visitors, maxLts);
+                    runOnce(visitors, maxLts);
                     System.out.println("Can not skip " + pdToCheck + "\nCan only skip these: " + toString(pdsToSkip));
                     pdsToSkip.remove(pdToCheck);
                 }
@@ -126,7 +129,7 @@ public class TrivialShrinker
 
                 try
                 {
-                    runOnce(run, visitors, maxLts);
+                    runOnce(visitors, maxLts);
                     System.out.println("Can not skip " + ltsToCheck + "\nCan only skip these: " + toString(ltsToSkip));
                     ltsToSkip.remove(ltsToCheck);
                 }
@@ -160,13 +163,13 @@ public class TrivialShrinker
         }
     }
 
-    public static void runOnce(Run run, List<Visitor> visitors, long maxLts)
+    public static void runOnce(List<Visitor> visitors, long maxLts)
     {
         for (long lts = 0; lts <= maxLts; lts++)
         {
             for (Visitor visitor : visitors)
             {
-                visitor.visit(lts);
+                visitor.visit();
             }
         }
     }
@@ -183,4 +186,4 @@ public class TrivialShrinker
         }
         return s.substring(0, s.length() - 1);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/src/harry/visitors/SkippingVisitor.java b/harry-integration/src/harry/visitors/SkippingVisitor.java
index 8da0a89..67c6488 100644
--- a/harry-integration/src/harry/visitors/SkippingVisitor.java
+++ b/harry-integration/src/harry/visitors/SkippingVisitor.java
@@ -19,20 +19,23 @@
 package harry.visitors;
 
 import java.util.Set;
+import java.util.function.LongSupplier;
 
-public class SkippingVisitor implements Visitor
+public class SkippingVisitor extends LtsVisitor
 {
     private final Set<Long> ltsToSkip;
     private final Set<Long> pdsToSkip;
     private final LtsToPd ltsToPd;
-    private final Visitor delegate;
+    // Use DelegatingVisitor class instead of VisitExecutor available via protected field
+    private LtsVisitor delegateShadow;
 
-    public SkippingVisitor(Visitor delegate,
+    public SkippingVisitor(LtsVisitor delegate,
+                           LongSupplier ltsSupplier,
                            LtsToPd ltsToPd,
                            Set<Long> ltsToSkip,
                            Set<Long> pdsToSkip)
     {
-        this.delegate = delegate;
+        super(delegate, ltsSupplier);
         this.ltsToSkip = ltsToSkip;
         this.pdsToSkip = pdsToSkip;
         this.ltsToPd = ltsToPd;
@@ -43,7 +46,7 @@ public class SkippingVisitor implements Visitor
         if (ltsToSkip.contains(lts) || pdsToSkip.contains(ltsToPd.convert(lts)))
             return;
 
-        delegate.visit(lts);
+        delegateShadow.visit(lts);
     }
 
     public static interface LtsToPd
diff --git a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
index 33463b8..0da7743 100644
--- a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
+++ b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
@@ -16,9 +16,9 @@ import harry.model.OpSelectors;
 import harry.model.sut.SystemUnderTest;
 import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.visitors.SingleValidator;
 import harry.util.TestRunner;
+import harry.visitors.Visitor;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.distributed.impl.RowUtil;
@@ -94,12 +94,12 @@ public class DataGeneratorsIntegrationTest extends CQLTester
 
                                 Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
                                 for (int lts = 0; lts < 100; lts++)
-                                    visitor.visit(lts);
+                                    visitor.visit();
                             }
 
                             Run run = builder.build()
                                              .createRun();
-                            Visitor visitor = new SingleValidator(100, run, NoOpChecker::new);
+                            SingleValidator visitor = new SingleValidator(100, run, NoOpChecker::new);
                             for (int lts = 0; lts < 100; lts++)
                                 visitor.visit(lts);
 
diff --git a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
index 6c0ee24..31cd9f1 100644
--- a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.function.Supplier;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import harry.core.Configuration;
@@ -65,6 +66,9 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
             HistoryBuilder history = new HistoryBuilder(run);
 
             Set<Long> pds = new HashSet<>();
+            run.tracker.onLtsStarted((long lts) -> {
+                pds.add(run.pdSelector.pd(lts, run.schemaSpec));
+            });
 
             for (int j = 0; j < 5; j++)
             {
@@ -97,28 +101,19 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
                            .partitionDelete()
                            .finish();
 
-                ReplayingVisitor visitor = history.visitor(new MutatingVisitor.MutatingVisitExecutor(run,
-                                                                                                     new MutatingRowVisitor(run)
-                                                                                                     {
-                                                                                                         public CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
-                                                                                                         {
-                                                                                                             pds.add(pd);
-                                                                                                             return super.perform(op, lts, pd, cd, opId);
-                                                                                                         }
-                                                                                                     }));
+                ReplayingVisitor visitor = history.visitor(run);
 
-                visitor.replayAll(run);
+                visitor.replayAll();
 
                 Model model = new QuiescentChecker(run, new Reconciler(run,
                                                                        history::visitor));
                 QueryGenerator.TypedQueryGenerator queryGenerator = new QueryGenerator.TypedQueryGenerator(run);
+                Assert.assertFalse(pds.isEmpty());
                 for (Long pd : pds)
                 {
-                    model.validate(Query.selectPartition(run.schemaSpec,
-                                                         pd,
-                                                         false));
+                    model.validate(Query.selectPartition(run.schemaSpec, pd,false));
 
-                    int lts = new Random().nextInt((int) run.clock.maxLts());
+                    int lts = new Random().nextInt((int) run.clock.peek());
                     for (int k = 0; k < 3; k++)
                         queryGenerator.inflate(lts, k);
                 }
@@ -169,7 +164,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
                                                                                                                     }
                                                                                                                 }));
 
-                                visitor.replayAll(run);
+                                visitor.replayAll();
 
                                 Model model = new QuiescentChecker(run, new Reconciler(run,
                                                                                        sut::visitor));
@@ -182,7 +177,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase
                                     model.validate(Query.selectPartition(run.schemaSpec,
                                                                          pd,
                                                                          true));
-                                    int lts = new Random().nextInt((int) run.clock.maxLts());
+                                    int lts = new Random().nextInt((int) run.clock.peek());
                                     for (int k = 0; k < 3; k++)
                                         queryGenerator.inflate(lts, k);
                                 }
diff --git a/harry-integration/test/harry/model/HistoryBuilderTest.java b/harry-integration/test/harry/model/HistoryBuilderTest.java
index 5ccca60..e7f3413 100644
--- a/harry-integration/test/harry/model/HistoryBuilderTest.java
+++ b/harry-integration/test/harry/model/HistoryBuilderTest.java
@@ -129,8 +129,9 @@ public class HistoryBuilderTest
                                 public void afterLts(long lts, long pd){}
                                 public void beforeBatch(long lts, long pd, long m){}
                                 public void afterBatch(long lts, long pd, long m){}
+                                public void shutdown() {}
                             });
-                            visitor.replayAll(run);
+                            visitor.replayAll();
                             for (Counts counts : model)
                                 Assert.assertTrue(counts.toString(), counts.allDone());
                         });
diff --git a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
index 0f28005..c84c789 100644
--- a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
+++ b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
@@ -32,7 +32,7 @@ import harry.model.sut.InJVMTokenAwareVisitExecutor;
 import harry.model.sut.InJvmSut;
 import harry.model.sut.SystemUnderTest;
 import harry.runner.RepairingLocalStateValidator;
-import harry.visitors.GeneratingVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.Visitor;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
@@ -70,21 +70,20 @@ public class InJVMTokenAwareExecutorTest extends IntegrationTestBase
             Run run = configuration.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
 
-            Visitor visitor = new GeneratingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
-                                                                                          new Configuration.MutatingRowVisitorConfiguration(),
-                                                                                          SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
+            Visitor visitor = new MutatingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
+                                                                                        new Configuration.MutatingRowVisitorConfiguration(),
+                                                                                        SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
 
             OpSelectors.MonotonicClock clock = run.clock;
             long maxPd = 0;
             for (int i = 0; i < 10000; i++)
             {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-                maxPd = Math.max(maxPd, run.pdSelector.positionFor(lts));
+                visitor.visit();
+                maxPd = Math.max(maxPd, run.pdSelector.positionFor(clock.peek()));
             }
 
             RepairingLocalStateValidator validator = new RepairingLocalStateValidator(5, 1, run, new Configuration.QuiescentCheckerConfig());
-            validator.visit(clock.maxLts());
+            validator.visit();
         }
 
     }
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index a6a9697..6d8b4ef 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -95,9 +95,9 @@ public class IntegrationTestBase extends TestBaseImpl
                                                        .setCreateSchema(true)
                                                        .setTruncateTable(false)
                                                        .setDropSchema(true)
-                                                       .setSchemaProvider(seed1 -> schema)
+                                                       .setSchemaProvider((seed1, sut) -> schema)
                                                        .setClusteringDescriptorSelector(sharedCDSelectorConfiguration().build())
                                                        .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, 200))
                                                        .setSUT(() -> sut);
     }
-}
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
index 576a291..3381137 100644
--- a/harry-integration/test/harry/model/ModelTestBase.java
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -18,7 +18,6 @@
 
 package harry.model;
 
-import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -30,9 +29,9 @@ import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.visitors.LoggingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.runner.Runner;
 import harry.visitors.SingleValidator;
+import harry.visitors.Visitor;
 
 public abstract class ModelTestBase extends IntegrationTestBase
 {
@@ -46,33 +45,30 @@ public abstract class ModelTestBase extends IntegrationTestBase
         }
     }
 
-    void negativeIntegrationTest(Model.ModelFactory factory) throws Throwable
+    void negativeIntegrationTest(Configuration.RunnerConfiguration runnerConfig) throws Throwable
     {
         Supplier<SchemaSpec> supplier = SchemaGenerators.progression(1);
         for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
         {
             SchemaSpec schema = supplier.get();
             Configuration.ConfigurationBuilder builder = configuration(i, schema);
+
             builder.setClock(new Configuration.ApproximateMonotonicClockConfiguration((int) TimeUnit.MINUTES.toMillis(10),
                                                                                       1, TimeUnit.SECONDS))
-                   .setRunTime(1, TimeUnit.MINUTES)
                    .setCreateSchema(false)
                    .setDropSchema(false)
-                   .setRunner(new Configuration.SequentialRunnerConfig(Arrays.asList(new Configuration.LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
-                                                                                     new Configuration.RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
-                                                                                     new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))));
-            Runner runner = builder.build().createRunner();
+                   .setRunner(runnerConfig);
+
+            Configuration config = builder.build();
+            Runner runner = config.createRunner();
+            
             try
             {
                 Run run = runner.getRun();
                 beforeEach();
                 run.sut.schemaChange(run.schemaSpec.compile().cql());
 
-                runner.initAndStartAll().get(2, TimeUnit.MINUTES);
-            }
-            catch (Throwable t)
-            {
-                throw t;
+                runner.initAndStartAll().get(4, TimeUnit.MINUTES);
             }
             finally
             {
@@ -83,7 +79,7 @@ public abstract class ModelTestBase extends IntegrationTestBase
 
     abstract Configuration.ModelConfiguration modelConfiguration();
 
-    protected Visitor validator(Run run)
+    protected SingleValidator validator(Run run)
     {
         return new SingleValidator(100, run , modelConfiguration());
     }
@@ -101,17 +97,13 @@ public abstract class ModelTestBase extends IntegrationTestBase
         beforeEach();
         run.sut.schemaChange(run.schemaSpec.compile().cql());
         System.out.println(run.schemaSpec.compile().cql());
-        OpSelectors.MonotonicClock clock = run.clock;
 
-        Visitor validator = validator(run);
         Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
 
         for (int i = 0; i < 20000; i++)
-        {
-            long lts = clock.nextLts();
-            visitor.visit(lts);
-        }
+            visitor.visit();
 
+        SingleValidator validator = validator(run);
         validator.visit(0);
 
         if (!corrupt.apply(run))
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 92498dd..13ae548 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -41,9 +41,9 @@ import harry.corruptor.ShowValueCorruptor;
 import harry.ddl.SchemaGenerators;
 import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
+import harry.visitors.Visitor;
 
 import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 
@@ -117,10 +117,7 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
             QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
 
             for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-            }
+                visitor.visit();
 
             while (true)
             {
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 6d8dc5c..099adf1 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -33,9 +33,9 @@ import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
+import harry.visitors.Visitor;
 
 import static harry.generators.DataGenerators.NIL_DESCR;
 
@@ -70,15 +70,11 @@ public class QuerySelectorTest extends IntegrationTestBase
 
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
-            OpSelectors.MonotonicClock clock = run.clock;
 
             Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-            }
+                visitor.visit();
 
             QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
 
@@ -145,14 +141,10 @@ public class QuerySelectorTest extends IntegrationTestBase
                                    .build();
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
-            OpSelectors.MonotonicClock clock = run.clock;
             Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                visitor.visit(lts);
-            }
+                visitor.visit();
 
             QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
             Model model = new QuiescentChecker(run);
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index 945dce2..97a554a 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -18,9 +18,19 @@
 
 package harry.model;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
 import org.junit.Test;
 
 import harry.core.Configuration;
+import harry.core.Configuration.AllPartitionsValidatorConfiguration;
+import harry.core.Configuration.ConcurrentRunnerConfig;
+import harry.core.Configuration.LoggingVisitorConfiguration;
+import harry.core.Configuration.RecentPartitionsValidatorConfiguration;
+import harry.core.Configuration.SequentialRunnerConfig;
+import harry.core.Configuration.SingleVisitRunnerConfig;
 import harry.core.Run;
 import harry.corruptor.AddExtraRowCorruptor;
 import harry.corruptor.ChangeValueCorruptor;
@@ -29,14 +39,14 @@ import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 import harry.ddl.SchemaSpec;
-import harry.visitors.Visitor;
 import harry.operations.Query;
+import harry.runner.StagedRunner.StagedRunnerConfig;
 import harry.visitors.SingleValidator;
 
 public class QuiescentCheckerIntegrationTest extends ModelTestBase
 {
     @Override
-    protected Visitor validator(Run run)
+    protected SingleValidator validator(Run run)
     {
         return new SingleValidator(100, run, modelConfiguration());
     }
@@ -56,7 +66,30 @@ public class QuiescentCheckerIntegrationTest extends ModelTestBase
     @Test
     public void normalConditionIntegrationTest() throws Throwable
     {
-        negativeIntegrationTest(modelConfiguration()::make);
+        Model.ModelFactory factory = modelConfiguration();
+                
+        SequentialRunnerConfig sequential = 
+                new SequentialRunnerConfig(Arrays.asList(new LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+                                                         new RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
+                                                         new AllPartitionsValidatorConfiguration(10, 10, factory::make)),
+                                           1, TimeUnit.MINUTES);
+        negativeIntegrationTest(sequential);
+    }
+
+    @Test
+    public void normalConditionStagedIntegrationTest() throws Throwable
+    {
+        Model.ModelFactory factory = modelConfiguration();
+
+        ConcurrentRunnerConfig concurrent = 
+                new ConcurrentRunnerConfig(4, Collections.singletonList(new LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration())),
+                                           30, TimeUnit.SECONDS);
+        SingleVisitRunnerConfig sequential = 
+                new SingleVisitRunnerConfig(Collections.singletonList(new RecentPartitionsValidatorConfiguration(1024, 0, 1, factory::make)));
+        
+        StagedRunnerConfig staged = new StagedRunnerConfig(Arrays.asList(concurrent, sequential), 2, TimeUnit.MINUTES);
+
+        negativeIntegrationTest(staged);
     }
 
     @Test
diff --git a/harry-integration/test/resources/single_partition_test.yml b/harry-integration/test/resources/single_partition_test.yml
index 8ec4c4e..3f2017f 100644
--- a/harry-integration/test/resources/single_partition_test.yml
+++ b/harry-integration/test/resources/single_partition_test.yml
@@ -12,9 +12,6 @@ clock:
   offset:
     offset: 1000
 
-run_time: 10
-run_time_unit: "MINUTES"
-
 system_under_test:
   println: {}
 
@@ -49,6 +46,8 @@ data_tracker:
 
 runner:
   sequential:
+    run_time: 10
+    run_time_unit: "MINUTES"
     visitors: []
 
 metric_reporter:

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org