You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by if...@apache.org on 2021/10/01 14:21:36 UTC

[cassandra-harry] 05/05: Add history builder and an ability to write unit-tests with Harry

This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git

commit f6b4df664b5ec79cf555aa0fb34e26f40fd9e9cd
Author: Alex Petrov <ol...@gmail.com>
AuthorDate: Mon Sep 27 12:56:33 2021 +0200

    Add history builder and an ability to write unit-tests with Harry
    
    Implement a full repair test
---
 README.md                                          | 218 ++++++-
 conf/default.yaml                                  |   2 +-
 conf/external.yaml                                 |   2 +-
 harry-core/src/harry/core/Configuration.java       |  78 +--
 .../src/harry/corruptor/AddExtraRowCorruptor.java  |   2 +-
 harry-core/src/harry/dsl/HistoryBuilder.java       | 625 +++++++++++++++++++++
 harry-core/src/harry/model/OpSelectors.java        |  25 +-
 harry-core/src/harry/model/QuiescentChecker.java   |  14 +-
 .../model/clock/ApproximateMonotonicClock.java     |   5 +
 harry-core/src/harry/model/clock/OffsetClock.java  |   5 +
 harry-core/src/harry/operations/WriteHelper.java   |   3 +-
 harry-core/src/harry/reconciler/Reconciler.java    |  86 +--
 harry-core/src/harry/runner/DataTracker.java       |   2 -
 .../src/harry/runner/DefaultDataTracker.java       |   5 +-
 harry-core/src/harry/runner/Runner.java            |  46 +-
 harry-core/src/harry/util/TestRunner.java          |  39 ++
 .../src/harry/visitors/AllPartitionsValidator.java |   4 +-
 ...artitionVisitor.java => CorruptingVisitor.java} |   8 +-
 .../src/harry/visitors/DelegatingVisitor.java      |  61 ++
 ...artitionVisitor.java => GeneratingVisitor.java} |  63 +--
 .../harry/visitors/LoggingPartitionVisitor.java    |  79 ---
 harry-core/src/harry/visitors/LoggingVisitor.java  |  90 +++
 .../harry/visitors/MutatingPartitionVisitor.java   | 145 -----
 .../src/harry/visitors/MutatingRowVisitor.java     |  18 +-
 harry-core/src/harry/visitors/MutatingVisitor.java | 169 ++++++
 .../{Operation.java => OperationExecutor.java}     |   4 +-
 ...Validator.java => ParallelRecentValidator.java} |  26 +-
 .../src/harry/visitors/ParallelValidator.java      |   4 +-
 ...artitionValidator.java => RecentValidator.java} |  16 +-
 .../src/harry/visitors/ReplayingVisitor.java       | 121 ++++
 harry-core/src/harry/visitors/Sampler.java         |   8 +-
 ...artitionValidator.java => SingleValidator.java} |  11 +-
 .../{PartitionVisitor.java => VisitExecutor.java}  |  23 +-
 .../{PartitionVisitor.java => Visitor.java}        |  12 +-
 harry-core/test/harry/model/OpSelectorsTest.java   |  12 +-
 harry-core/test/harry/operations/RelationTest.java |   2 +-
 .../test/resources/single_partition_test.yml       |  55 --
 .../model/sut/InJVMTokenAwareVisitExecutor.java    | 114 ++++
 .../src/harry/model/sut/InJvmSut.java              |  22 +
 .../src/harry/runner/FaultInjectingVisitor.java    |  22 +-
 .../harry/runner/RepairingLocalStateValidator.java |  20 +-
 .../src/harry/runner/TrivialShrinker.java          |  29 +-
 .../harry/visitors/SkippingPartitionVisitor.java   |  53 --
 .../src/harry/visitors/SkippingVisitor.java        |  36 +-
 .../generators/DataGeneratorsIntegrationTest.java  |  32 +-
 .../harry/model/HistoryBuilderIntegrationTest.java | 197 +++++++
 .../test/harry/model/HistoryBuilderTest.java       | 186 ++++++
 .../harry/model/InJVMTokenAwareExecutorTest.java   |  91 +++
 .../test/harry/model/ModelTestBase.java            |  22 +-
 .../harry/model/QuerySelectorNegativeTest.java     |  10 +-
 .../test/harry/model/QuerySelectorTest.java        |  12 +-
 .../model/QuiescentCheckerIntegrationTest.java     |   8 +-
 .../test/harry/model/TestEveryClustering.java      |  83 ---
 .../test/resources/single_partition_test.yml       |   2 +-
 54 files changed, 2284 insertions(+), 743 deletions(-)

diff --git a/README.md b/README.md
index 841e4ea..5c45c09 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,171 @@
 # Harry, a fuzz testing tool for Apache Cassandra
 
-Project aims to generate _reproducible_ workloads that are as close to real-life
+The project aims to generate _reproducible_ workloads that are as close to real-life
 as possible, while being able to _efficiently_ verify the cluster state against
 the model without pausing the workload itself.
 
+# Introduction
+
+Harry has two primary modes of functionality:
+
+  * Unit test mode: in which you define specific sequences of
+    operations and let Harry test these operations using different
+    schemas and conditions.
+  * Exploratory/fuzz mode: in which you define distributions of events
+    rather rather than sequences themselves, and let Harry try out
+    different things.
+
+Usually, in unit-test mode, we’re applying several write operations to
+the cluster state and then run different read queries and validate
+their results. To learn more about writing unit tests, refer to the "Writing
+Unit Tests" section.
+
+In exploratory mode, we continuously apply write operations to the
+cluster and validate their state, allowing data size to grow and simulating
+real-life behaviour. To learn more about implementing test cases using
+fuzz mode, refer to the "Implementing Tests" section of this guide, but it's likely
+you'll have to read the rest of this document to implement more
+complex scenarios.
+
+# Writing Unit Tests
+
+To write unit tests with Harry, there's no special knowledge required.
+Usually, unit tests are written by simply hardcoding the schema and then writing
+several modification statements one after the other, and then manually validating results
+of a `SELECT` query. This might work for simple scenarios, but there’s still a chance
+that for some other schema or some combination of values the tested feature may not work.
+
+To improve the situation, we can express the test in more abstract
+terms and, instead of writing it using specific statements, we can
+describe which statement _types_ are to be used:
+
+```
+test(new SchemaGenerators.Builder("harry")
+                         .partitionKeySpec(1, 5)
+                         .clusteringKeySpec(1, 5)
+                         .regularColumnSpec(1, 10)
+                         .generator(),
+     historyBuilder -> {
+         historyBuilder.nextPartition()
+                       .simultaneously()
+                       .randomOrder()
+                         .partitionDeletion()
+                         .rangeDeletion()
+                       .finish();
+     });
+```
+
+This spec can be used to generate clusters of different sizes,
+configured with different schemas, executing the given a sequence of
+actions both in isolation, and combined with other randomly generated
+ones, with failure-injection.
+
+Best of all is that this test will _not only_ ensure that such a
+sequence of actions does not produce an exception, but also would
+ensure that cluster will respond with correct results to _any_ allowed
+read query.
+
+`HistoryBuilder` is using the configuration provided by Harry `Run`, which
+can be written either using `Configuration#ConfigurationBuilder` like
+we did
+[here](https://github.com/apache/cassandra-harry/blob/ddd643ecc904258abe5e2f73d9b612793b0ac0e6/harry-integration/test/harry/model/IntegrationTestBase.java#L91),
+or provided in a [yaml file](https://github.com/apache/cassandra-harry/blob/ddd643ecc904258abe5e2f73d9b612793b0ac0e6/conf/example.yaml).
+
+To begin specifying operations for a new partition,
+`HistoryBuilder#nextPartition` has to be called, which returns a
+`PartitionBuilder`. For the commands within this partition, you have
+a choice between:
+  * `PartitionBuilder#simultaneously`, which will execute listed
+  operations with the same timestamp
+  * `PartitionBuilder#sequentially`, which will execute listed operations
+  with monotonically increasing timestamps, giving each operation its
+  own timestamp
+
+Similarly, you can choose between:
+  * `PartitionBuilder#randomOrder`, which will execute listed operations
+  in random order
+  * `PartitionBuilder#strictOrder`, which will execute listed operations
+  in the order specified by the user
+
+The rest of operations are self-explanatory: `#insert`, `#update`,
+`#delete`, `#columnDelete`, `#rangeDelete`, `#sliceDelete`,
+`#partitionDelete`, and their plural counterparts.
+
+After history generated by `HistoryBuilder` is replayed using
+`ReplayingVisitor`, you can use any model (`QuiescentChecker` by
+default) to validate queries.  Queries can be provided manually or
+generated using `QueryGenerator` or `TypedQueryGenerator`.
+
+# Basic Terminology
+
+  * Inflate / inflatable: a process of producing a value (for example, string, or a blob)
+  from a `long` descriptor that uniquely identifies the value.
+  See [data generation](https://github.com/apache/cassandra-harry#data-generation) section
+  of this guide for more details.
+  * Deflate / deflatable: a process of producing the descriptor the value was inflated
+  from during verification. See [model](https://github.com/apache/cassandra-harry#model)
+  section of this guide for more details.
+
+For definitions of logical timestamp, descriptor, and other entities used during
+inflation and deflation, refer to [formal relationships](https://github.com/apache/cassandra-harry#formal-relations-between-entities)
+section.
+
+# Features
+
+Currently, Harry can exercise the following Cassandra functionality:
+
+  * Supported data types: `int8`, `int16`, `int32`, `int64`, `boolean`, `float`,
+  `double`, `ascii`, `uuid`, `timestamp`. Collections are only _inflatable_.
+  * Random schema generation, with an arbitrary number of partition and clustering
+  keys.
+  * Schemas with arbitrary `CLUSTERING ORDER BY`
+  * Randomly generated `INSERT` and `UPDATE` queries with all columns or arbitrary
+  column subset
+  * Randomly generated `DELETE` queries: for a single column, single row, or
+  a range of rows
+  * Inflating and validating entire partitions (with allowed in-flight queries)
+  * Inflating and validating random `SELECT` queries: single row, slices (with single
+  open end), and ranges (with both ends of clusterings specified)
+
+Inflating partitions is done using [Reconciler](https://github.com/apache/cassandra-harry/blob/master/harry-core/src/harry/reconciler/Reconciler.java).
+Validating partitions and random queries can be done using [Quiescent Checker](https://github.com/apache/cassandra-harry/blob/master/harry-core/src/harry/model/QuiescentChecker.java)
+and [Exhaustive Checker](https://github.com/apache/cassandra-harry/blob/master/harry-core/src/harry/model/ExhaustiveChecker.java).
+
+## What's missing
+
+Harry is by no means feature-complete. Main things that are missing are:
+
+  * Some types (such as collections) are not deflatable
+  * Some types are implemented but are not hooked up (`blob` and `text`) to DSL/generator
+  * Partition deletions are not implemented
+  * 2i queries are not implemented
+  * Compact storage is not implemented
+  * Static columns are not implemented
+  * Fault injection is not implemented
+  * Runner and scheduler are rather rudimentary and require significant rework and proper scheduling
+  * TTL is not supported
+  * Some SELECT queries are not supported: `LIMIT`, `IN`, `GROUP BY`, token range queries
+  * Partition deletions are not implemented
+  * Pagination is not implemented
+
+Some things, even though are implemented, can be improved or optimized:
+
+  * RNG should be able to yield less than 64 bits of entropy per step
+  * State tracking should be done in a compact off-heap data stucture
+  * Inflated partition state and per-row operation log should be done in a compact
+  off-heap data structure
+  * Exhaustive checker can be significantly optimized
+  * Harry shouldn't rely on java-driver for query generation
+  * Exhaustive checker should use more precise information from data tracker, not
+  just watermarks
+  * Decision-making about _when_ we visit partitions and/or rows should be improved
+
+This list of improvements is incomplete, and should only give the reader a rough
+idea about the state of the project. Main goal for the initial release was to make it
+useful, now we can make it fast and feature-complete!
+
+# Goals
+
 _Reproducibility_ is achieved by using the PCG family of random number
 generators and generating schema, configuration, and every step of the workload
 from the repeatable sequence of random numbers. Schema and configuration are
@@ -58,6 +220,60 @@ visited for a logical timestamp, how many operations there will be in batch,
 what kind of operations there will and how often each kind of operation is going
 to occur.
 
+# Implementing Tests
+
+All Harry components are pluggable and can be redefined. However, many
+of the default implementations will cover most of the use-cases, so in
+this guide we’ll focus on ones that are most often used to implement
+different use cases:
+
+  * System Under Test: defines how Harry can communicate with
+    Cassandra instances and issue common queries. Examples of a system
+    under test can be a CCM cluster, a “real” Cassandra cluster, or an
+    in-JVM dtest cluster.
+  * Visitor: defines behaviour that gets triggered at a specific
+    logical timestamp. One of the default implementations is
+    MutatingVisitor, which executes write workload against
+    SystemUnderTest. Examples of a visitor, besides a mutating visitor,
+    could be a validator that uses the model to validate results of
+    different queries, a repair runner, or a fault injector.
+  * Model: validates results of read queries by comparing its own
+    internal representation against the results returned by system
+    under test. You can find three simplified implementations of
+    model in this document: Visible Rows Checker, Quiescent Checker,
+    and an Exhaustive Checker.
+  * Runner: defines how operations defined by visitors are
+    executed. Harry includes two default implementations: a sequential
+    and a concurrent runner. Sequential runner allows no overlap
+    between different visitors or logical timestamps. Concurrent
+    runner allows visitors for different timestamps to overlap.
+
+System under test is the simplest one to implement: you only need a
+way to execute Cassandra queries. At the moment of writing, all custom
+things, such as nodetool commands, failure injection, etc, are
+implemented using a SUT / visitor combo: visitor knows about internals
+of the cluster it is dealing with.
+
+Generally, visitor has to follow the rules specified by
+DescriptorSelector and PdSelector: (it can only visit issue mutations
+against the partition that PdSelector has picked for this LTS), and
+DescriptorSelector (it can visit exactly
+DescriptorSelector#numberOfModifications rows within this partition,
+operations have to have a type specified by #operationKind, clustering
+and value descriptors have to be in accordance with
+DescriptorSelector#cd and DescriptorSelector#vds). The reason for
+these limitations is because model has to be able to reproduce the
+exact sequence of events that was applied to system under test.
+
+Default implementations of partition and clustering descriptors, used
+in fuzz mode allow to optimise verification. For example, it is
+possible to go find logical timestamps that are visiting the same
+partition as any given logical timestamp. When running Harry in
+unit-test mode, we use a special generating visitor that keeps an
+entire given sequence of events in memory rather than producing it on
+the fly. For reasons of efficiency, we do not use generating visitors
+for generating and verifying large datasets.
+
 # Formal Relations Between Entities
 
 To be able to implement efficient models, we had to reduce the amount of state
diff --git a/conf/default.yaml b/conf/default.yaml
index cdbc34c..c8d00b3 100644
--- a/conf/default.yaml
+++ b/conf/default.yaml
@@ -85,7 +85,7 @@ clustering_descriptor_selector:
 # and model state.
 runner:
   sequential:
-    partition_visitors:
+    visitors:
       - logging:
           row_visitor:
             mutating: {}
diff --git a/conf/external.yaml b/conf/external.yaml
index 52943b8..a1432be 100644
--- a/conf/external.yaml
+++ b/conf/external.yaml
@@ -92,7 +92,7 @@ clustering_descriptor_selector:
 # and model state.
 runner:
   sequential:
-    partition_visitors:
+    visitors:
       - logging:
           row_visitor:
             mutating: {}
diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index 0e065d5..c6fecc7 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -47,16 +47,16 @@ import harry.model.clock.OffsetClock;
 import harry.model.sut.PrintlnSut;
 import harry.model.sut.SystemUnderTest;
 import harry.visitors.AllPartitionsValidator;
-import harry.visitors.CorruptingPartitionVisitor;
+import harry.visitors.CorruptingVisitor;
 import harry.runner.DataTracker;
 import harry.runner.DefaultDataTracker;
-import harry.visitors.LoggingPartitionVisitor;
-import harry.visitors.MutatingPartitionVisitor;
+import harry.visitors.LoggingVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.Operation;
-import harry.visitors.ParallelRecentPartitionValidator;
-import harry.visitors.PartitionVisitor;
-import harry.visitors.RecentPartitionValidator;
+import harry.visitors.OperationExecutor;
+import harry.visitors.ParallelRecentValidator;
+import harry.visitors.Visitor;
+import harry.visitors.RecentValidator;
 import harry.runner.Runner;
 import harry.visitors.Sampler;
 import harry.util.BitSet;
@@ -87,12 +87,12 @@ public class Configuration
         mapper.registerSubtypes(DefaultSchemaProviderConfiguration.class);
         mapper.registerSubtypes(MutatingRowVisitorConfiguration.class);
 
-        mapper.registerSubtypes(MutatingPartitionVisitorConfiguation.class);
-        mapper.registerSubtypes(LoggingPartitionVisitorConfiguration.class);
+        mapper.registerSubtypes(MutatingVisitorConfiguation.class);
+        mapper.registerSubtypes(LoggingVisitorConfiguration.class);
         mapper.registerSubtypes(AllPartitionsValidatorConfiguration.class);
-        mapper.registerSubtypes(ParallelRecentPartitionValidator.ParallelRecentPartitionValidatorConfig.class);
+        mapper.registerSubtypes(ParallelRecentValidator.ParallelRecentValidatorConfig.class);
         mapper.registerSubtypes(Sampler.SamplerConfiguration.class);
-        mapper.registerSubtypes(CorruptingPartitionVisitorConfiguration.class);
+        mapper.registerSubtypes(CorruptingVisitorConfiguration.class);
         mapper.registerSubtypes(RecentPartitionsValidatorConfiguration.class);
         mapper.registerSubtypes(FixedSchemaProviderConfiguration.class);
         mapper.registerSubtypes(AlwaysSamePartitionSelector.AlwaysSamePartitionSelectorConfiguration.class);
@@ -562,38 +562,38 @@ public class Configuration
     public static class ConcurrentRunnerConfig implements RunnerConfiguration
     {
         public final int concurrency;
-        public final List<PartitionVisitorConfiguration> partition_visitor_factories;
+        public final List<VisitorConfiguration> visitor_factories;
 
         @JsonCreator
         public ConcurrentRunnerConfig(@JsonProperty(value = "concurrency", defaultValue = "2") int concurrency,
-                                      @JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
+                                      @JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
         {
             this.concurrency = concurrency;
-            this.partition_visitor_factories = partitionVisitors;
+            this.visitor_factories = visitors;
         }
 
         @Override
         public Runner make(Run run, Configuration config)
         {
-            return new Runner.ConcurrentRunner(run, config, concurrency, partition_visitor_factories);
+            return new Runner.ConcurrentRunner(run, config, concurrency, visitor_factories);
         }
     }
 
     @JsonTypeName("sequential")
     public static class SequentialRunnerConfig implements RunnerConfiguration
     {
-        public final List<PartitionVisitorConfiguration> partition_visitor_factories;
+        public final List<VisitorConfiguration> visitor_factories;
 
         @JsonCreator
-        public SequentialRunnerConfig(@JsonProperty(value = "partition_visitors") List<PartitionVisitorConfiguration> partitionVisitors)
+        public SequentialRunnerConfig(@JsonProperty(value = "visitors") List<VisitorConfiguration> visitors)
         {
-            this.partition_visitor_factories = partitionVisitors;
+            this.visitor_factories = visitors;
         }
 
         @Override
         public Runner make(Run run, Configuration config)
         {
-            return new Runner.SequentialRunner(run, config, partition_visitor_factories);
+            return new Runner.SequentialRunner(run, config, visitor_factories);
         }
     }
 
@@ -923,49 +923,49 @@ public class Configuration
 
 
     @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
-    public interface PartitionVisitorConfiguration extends PartitionVisitor.PartitionVisitorFactory
+    public interface VisitorConfiguration extends Visitor.VisitorFactory
     {
     }
 
 
     @JsonTypeName("mutating")
-    public static class MutatingPartitionVisitorConfiguation implements PartitionVisitorConfiguration
+    public static class MutatingVisitorConfiguation implements VisitorConfiguration
     {
         public final RowVisitorConfiguration row_visitor;
 
         @JsonCreator
-        public MutatingPartitionVisitorConfiguation(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+        public MutatingVisitorConfiguation(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
         {
             this.row_visitor = row_visitor;
         }
 
         @Override
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
-            return new MutatingPartitionVisitor(run, row_visitor);
+            return new MutatingVisitor(run, row_visitor);
         }
     }
 
     @JsonTypeName("logging")
-    public static class LoggingPartitionVisitorConfiguration implements PartitionVisitorConfiguration
+    public static class LoggingVisitorConfiguration implements VisitorConfiguration
     {
         protected final RowVisitorConfiguration row_visitor;
 
         @JsonCreator
-        public LoggingPartitionVisitorConfiguration(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
+        public LoggingVisitorConfiguration(@JsonProperty("row_visitor") RowVisitorConfiguration row_visitor)
         {
             this.row_visitor = row_visitor;
         }
 
         @Override
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
-            return new LoggingPartitionVisitor(run, row_visitor);
+            return new LoggingVisitor(run, row_visitor);
         }
     }
 
     @JsonTypeName("validate_all_partitions")
-    public static class AllPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    public static class AllPartitionsValidatorConfiguration implements VisitorConfiguration
     {
         public final int concurrency;
         public final int trigger_after;
@@ -981,31 +981,31 @@ public class Configuration
             this.modelConfiguration = model;
         }
 
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
             return new AllPartitionsValidator(concurrency, trigger_after, run, modelConfiguration);
         }
     }
 
     @JsonTypeName("corrupt")
-    public static class CorruptingPartitionVisitorConfiguration implements Configuration.PartitionVisitorConfiguration
+    public static class CorruptingVisitorConfiguration implements VisitorConfiguration
     {
         public final int trigger_after;
 
         @JsonCreator
-        public CorruptingPartitionVisitorConfiguration(@JsonProperty("trigger_after") int trigger_after)
+        public CorruptingVisitorConfiguration(@JsonProperty("trigger_after") int trigger_after)
         {
             this.trigger_after = trigger_after;
         }
 
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
-            return new CorruptingPartitionVisitor(trigger_after, run);
+            return new CorruptingVisitor(trigger_after, run);
         }
     }
 
     @JsonTypeName("validate_recent_partitions")
-    public static class RecentPartitionsValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    public static class RecentPartitionsValidatorConfiguration implements VisitorConfiguration
     {
         public final int partition_count;
         public final int trigger_after;
@@ -1026,14 +1026,14 @@ public class Configuration
         }
 
         @Override
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
-            return new RecentPartitionValidator(partition_count, queries, trigger_after, run, modelConfiguration);
+            return new RecentValidator(partition_count, queries, trigger_after, run, modelConfiguration);
         }
     }
 
     @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
-    public interface RowVisitorConfiguration extends Operation.RowVisitorFactory
+    public interface RowVisitorConfiguration extends OperationExecutor.RowVisitorFactory
     {
     }
 
@@ -1041,7 +1041,7 @@ public class Configuration
     public static class MutatingRowVisitorConfiguration implements RowVisitorConfiguration
     {
         @Override
-        public Operation make(Run run)
+        public OperationExecutor make(Run run)
         {
             return new MutatingRowVisitor(run);
         }
diff --git a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
index e05dffd..ace55b4 100644
--- a/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
+++ b/harry-core/src/harry/corruptor/AddExtraRowCorruptor.java
@@ -77,7 +77,7 @@ public class AddExtraRowCorruptor implements QueryResponseCorruptor
                 return false;
         }
 
-        long[] vds = descriptorSelector.vds(query.pd, cd, maxLts, 0, schema);
+        long[] vds = descriptorSelector.vds(query.pd, cd, maxLts, 0, OpSelectors.OperationKind.INSERT, schema);
 
         // We do not know if the row was deleted. We could try inferring it, but that
         // still won't help since we can't use it anyways, since collisions between a
diff --git a/harry-core/src/harry/dsl/HistoryBuilder.java b/harry-core/src/harry/dsl/HistoryBuilder.java
new file mode 100644
index 0000000..5e1ff6f
--- /dev/null
+++ b/harry-core/src/harry/dsl/HistoryBuilder.java
@@ -0,0 +1,625 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.dsl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+
+import harry.core.Run;
+import harry.model.OpSelectors;
+import harry.visitors.ReplayingVisitor;
+import harry.visitors.VisitExecutor;
+
+import static harry.model.OpSelectors.DefaultPdSelector.PARTITION_DESCRIPTOR_STREAM_ID;
+
+// TODO: we could use some sort of compact data structure or file format for navigable operation history
+public class HistoryBuilder implements Iterable<ReplayingVisitor.Visit>
+{
+    private final Run run;
+    private final List<ReplayingVisitor.Visit> log;
+
+    private long lts;
+    private final Set<Long> pds = new HashSet<>();
+
+    public Map<Long, NavigableSet<Long>> pdToLtsMap = new HashMap<>();
+
+    private int partitions;
+
+    public HistoryBuilder(Run run)
+    {
+        this.run = run;
+        this.log = new ArrayList<>();
+        this.lts = 0;
+        this.partitions = 0;
+
+        assert run.pdSelector instanceof PdSelector;
+        ((PdSelector) run.pdSelector).historyBuilder = this;
+    }
+
+    public Iterator<ReplayingVisitor.Visit> iterator()
+    {
+        return log.iterator();
+    }
+
+    public static class PdSelector extends OpSelectors.PdSelector
+    {
+        // We can only lazy-initialise it since history builder is created after pd selector
+        private HistoryBuilder historyBuilder;
+
+        protected long pd(long lts)
+        {
+            return historyBuilder.log.get((int) lts).pd;
+        }
+
+        public long nextLts(long lts)
+        {
+            Long next = historyBuilder.pdToLtsMap.get(pd(lts)).higher(lts);
+            if (null == next)
+                return -1;
+            return next;
+        }
+
+        public long prevLts(long lts)
+        {
+            Long prev = historyBuilder.pdToLtsMap.get(pd(lts)).lower(lts);
+            if (null == prev)
+                return -1;
+            return prev;
+        }
+
+        public long maxLtsFor(long pd)
+        {
+            return historyBuilder.pdToLtsMap.get(pd).last();
+        }
+
+        public long minLtsAt(long position)
+        {
+            return historyBuilder.pdToLtsMap.get(historyBuilder.pd(position)).first();
+        }
+
+        public long minLtsFor(long pd)
+        {
+            return historyBuilder.pdToLtsMap.get(pd).first();
+        }
+
+        public long positionFor(long lts)
+        {
+            return historyBuilder.position(pd(lts));
+        }
+    }
+
+    private static abstract class Step
+    {
+        public abstract ReplayingVisitor.Batch toBatch(long pd, long lts, long m, LongSupplier opIdSupplier);
+    }
+
+    private class BatchStep extends Step
+    {
+        private final List<OperationStep> steps;
+
+        protected BatchStep(List<OperationStep> steps)
+        {
+            this.steps = steps;
+        }
+
+        public ReplayingVisitor.Batch toBatch(long pd, long lts, long m, LongSupplier opIdSupplier)
+        {
+            ReplayingVisitor.Operation[] ops = new ReplayingVisitor.Operation[steps.size()];
+            for (int i = 0; i < ops.length; i++)
+            {
+                OperationStep opStep = steps.get(i);
+                long opId = opIdSupplier.getAsLong();
+                long cd = HistoryBuilder.this.cd(pd, lts, opId);
+                ops[i] = op(cd, opId, opStep.opType);
+            }
+
+            return HistoryBuilder.batch(m, ops);
+        }
+    }
+
+    private class OperationStep extends Step
+    {
+        private final OpSelectors.OperationKind opType;
+
+        protected OperationStep(OpSelectors.OperationKind opType)
+        {
+            this.opType = opType;
+        }
+
+        public ReplayingVisitor.Batch toBatch(long pd, long lts, long m, LongSupplier opIdSupplier)
+        {
+            long opId = opIdSupplier.getAsLong();
+            long cd = HistoryBuilder.this.cd(pd, lts, opId);
+            return HistoryBuilder.batch(m,
+                                        HistoryBuilder.op(cd, opIdSupplier.getAsLong(), opType));
+        }
+    }
+
+    public PartitionBuilder nextPartition()
+    {
+        long pd = pd(partitions++);
+        return new PartitionBuilder(pd);
+    }
+
+    // Ideally, we'd like to make these more generic
+    private long pd(long position)
+    {
+        long pd = run.schemaSpec.adjustPdEntropy(run.rng.prev(position, PARTITION_DESCRIPTOR_STREAM_ID));
+        pds.add(pd);
+        return pd;
+    }
+
+    private long position(long pd)
+    {
+        return run.rng.next(pd, PARTITION_DESCRIPTOR_STREAM_ID);
+    }
+
+    protected long cd(long pd, long lts, long opId)
+    {
+        return run.descriptorSelector.cd(pd, lts, opId, run.schemaSpec);
+    }
+
+    public class PartitionBuilder implements OperationBuilder<PartitionBuilder>
+    {
+
+        final List<Step> steps = new ArrayList<>();
+        final long pd;
+
+        boolean strictOrder = true;
+        boolean sequentially = true;
+
+        boolean finished = false;
+
+        public PartitionBuilder(long pd)
+        {
+            this.pd = pd;
+        }
+
+        public BatchBuilder<PartitionBuilder> batch()
+        {
+            return new BatchBuilder<>(this, steps::add);
+        }
+
+        /**
+         * Execute operations listed by users of this PartitionBuilder with same logical timestamp.
+         */
+        public PartitionBuilder simultaneously()
+        {
+            this.sequentially = false;
+            return this;
+        }
+
+        /**
+         * Execute operations listed by users of this PartitionBuilder with monotonically increasing timestamps,
+         * giving each operation its own timestamp. Timestamp order can be determined by `#randomOrder` / `#strictOrder`.
+         */
+        public PartitionBuilder sequentially()
+        {
+            this.sequentially = true;
+            return this;
+        }
+
+        /**
+         * Execute operations listed by users of this PartitionBuilder in random order
+         */
+        public PartitionBuilder randomOrder()
+        {
+            strictOrder = false;
+            return this;
+        }
+
+        /**
+         * Execute operations listed by users of this PartitionBuilder in the order given by the user
+         */
+        public PartitionBuilder strictOrder()
+        {
+            strictOrder = true;
+            return this;
+        }
+
+        public PartitionBuilder partitionDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_PARTITION);
+        }
+
+        public PartitionBuilder partitionDeletions(int n)
+        {
+            for (int i = 0; i < n; i++)
+                partitionDelete();
+            return this;
+        }
+
+        public PartitionBuilder update()
+        {
+            return step(OpSelectors.OperationKind.UPDATE);
+        }
+
+        public PartitionBuilder updates(int n)
+        {
+            for (int i = 0; i < n; i++)
+                update();
+            return this;
+        }
+
+        public PartitionBuilder insert()
+        {
+            return step(OpSelectors.OperationKind.INSERT);
+        }
+
+        public PartitionBuilder inserts(int n)
+        {
+            for (int i = 0; i < n; i++)
+                insert();
+            return this;
+        }
+
+        public PartitionBuilder delete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_ROW);
+        }
+
+        public PartitionBuilder deletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                delete();
+            return this;
+        }
+
+        public PartitionBuilder columnDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS);
+        }
+
+        public PartitionBuilder columnDeletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                columnDelete();
+
+            return this;
+        }
+
+        public PartitionBuilder rangeDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_RANGE);
+        }
+
+        public PartitionBuilder rangeDeletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                rangeDelete();
+
+            return this;
+        }
+
+        public PartitionBuilder sliceDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_SLICE);
+        }
+
+        public PartitionBuilder sliceDeletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                sliceDelete();
+
+            return this;
+        }
+
+        public PartitionBuilder partitionBuilder()
+        {
+            return this;
+        }
+
+        public HistoryBuilder finish()
+        {
+            assert !finished;
+            finished = true;
+
+            if (!strictOrder)
+                // TODO: In the future/for large sets we could avoid generating the values and just generate them on the fly:
+                // https://lemire.me/blog/2017/09/18/visiting-all-values-in-an-array-exactly-once-in-random-order/
+                // we could just save the rules for generation, for example
+                Collections.shuffle(steps);
+
+            addSteps(steps);
+            steps.clear();
+            return HistoryBuilder.this;
+        }
+
+        void addSteps(List<Step> steps)
+        {
+            List<ReplayingVisitor.Batch> batches = new ArrayList<>();
+            Counter m = new Counter();
+            Counter opId = new Counter();
+            for (Step step : steps)
+            {
+                batches.add(step.toBatch(pd, lts, m.get(), opId::getAndIncrement));
+
+                if (sequentially)
+                {
+                    assert lts == log.size();
+                    addToLog(pd, batches);
+                    m.reset();
+                }
+                else
+                {
+                    m.increment();
+                }
+
+                opId.reset();
+            }
+
+            // If we were generating steps for the partition with same LTS, add remaining steps
+            if (!batches.isEmpty())
+            {
+                assert !sequentially;
+                addToLog(pd, batches);
+            }
+        }
+
+        PartitionBuilder step(OpSelectors.OperationKind opType)
+        {
+            steps.add(new OperationStep(opType));
+            return this;
+        }
+    }
+
+    private void addToLog(long pd, List<ReplayingVisitor.Batch> batches)
+    {
+        pdToLtsMap.compute(pd, (ignore, ltss) -> {
+            if (null == ltss)
+                ltss = new TreeSet<>();
+            ltss.add(lts);
+            return ltss;
+        });
+
+        log.add(visit(lts++, pd, batches.toArray(new ReplayingVisitor.Batch[0])));
+        batches.clear();
+    }
+
+    private static class Counter
+    {
+        long i;
+        void reset()
+        {
+            i = 0;
+        }
+
+        long increment()
+        {
+            return i++;
+        }
+
+        long getAndIncrement()
+        {
+            return i++;
+        }
+
+        long get()
+        {
+            return i;
+        }
+    }
+
+    public class BatchBuilder<T extends OperationBuilder<?>> implements OperationBuilder<BatchBuilder<T>>
+    {
+        final T operationBuilder;
+        final List<OperationStep> steps = new ArrayList<>();
+        final Consumer<Step> addStep;
+        boolean strictOrder;
+
+        boolean finished = false;
+        public BatchBuilder(T operationBuilder,
+                            Consumer<Step> addStep)
+        {
+            this.operationBuilder = operationBuilder;
+            this.addStep = addStep;
+        }
+
+        public BatchBuilder<T> randomOrder()
+        {
+            this.strictOrder = false;
+            return this;
+        }
+
+        public BatchBuilder<T> strictOrder()
+        {
+            this.strictOrder = true;
+            return this;
+        }
+
+        public T finish()
+        {
+            assert !finished;
+            finished = true;
+            if (!strictOrder)
+                // TODO
+                Collections.shuffle(steps);
+
+            addStep.accept(new BatchStep(steps));
+            return operationBuilder;
+        }
+
+        public BatchBuilder<T> partitionDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_PARTITION);
+        }
+
+        public BatchBuilder<T> partitionDeletions(int n)
+        {
+            for (int i = 0; i < n; i++)
+                partitionDelete();
+            return this;
+        }
+
+        public BatchBuilder<T> update()
+        {
+            return step(OpSelectors.OperationKind.UPDATE_WITH_STATICS);
+        }
+
+        public BatchBuilder<T> updates(int n)
+        {
+            for (int i = 0; i < n; i++)
+                update();
+            return this;
+        }
+
+        public BatchBuilder<T> insert()
+        {
+            return step(OpSelectors.OperationKind.INSERT_WITH_STATICS);
+        }
+
+        BatchBuilder<T> step(OpSelectors.OperationKind opType)
+        {
+            steps.add(new OperationStep(opType));
+            return this;
+        }
+
+        public BatchBuilder<T> inserts(int n)
+        {
+            for (int i = 0; i < n; i++)
+                insert();
+            return this;
+        }
+
+        public BatchBuilder<T> delete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_ROW);
+        }
+
+        public BatchBuilder<T> deletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                delete();
+            return this;
+        }
+
+        public BatchBuilder<T> columnDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS);
+        }
+
+        public BatchBuilder<T> columnDeletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                columnDelete();
+
+            return this;
+        }
+
+        public BatchBuilder<T> rangeDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_RANGE);
+        }
+
+        public BatchBuilder<T> rangeDeletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                rangeDelete();
+            return this;
+        }
+
+        public BatchBuilder<T> sliceDelete()
+        {
+            return step(OpSelectors.OperationKind.DELETE_SLICE);
+        }
+
+        public BatchBuilder<T> sliceDeletes(int n)
+        {
+            for (int i = 0; i < n; i++)
+                sliceDelete();
+            return this;
+        }
+
+        public PartitionBuilder partitionBuilder()
+        {
+            return operationBuilder.partitionBuilder();
+        }
+    }
+
+    public interface OperationBuilder<T extends OperationBuilder<?>>
+    {
+        T randomOrder();
+        T strictOrder();
+        T partitionDelete();
+        T partitionDeletions(int n);
+        T update();
+        T updates(int n);
+        T insert();
+        T inserts(int n);
+        T delete();
+        T deletes(int n);
+        T columnDelete();
+        T columnDeletes(int n);
+        T rangeDelete();
+        T rangeDeletes(int n);
+        T sliceDelete();
+        T sliceDeletes(int n);
+        PartitionBuilder partitionBuilder();
+    }
+
+    public static ReplayingVisitor.Visit visit(long lts, long pd, ReplayingVisitor.Batch... ops)
+    {
+        return new ReplayingVisitor.Visit(lts, pd, ops);
+    }
+
+    public static ReplayingVisitor.Batch batch(long m, ReplayingVisitor.Operation... ops)
+    {
+        return new ReplayingVisitor.Batch(m, ops);
+    }
+
+    public static ReplayingVisitor.Operation op(long cd, long opId, OpSelectors.OperationKind opType)
+    {
+        return new ReplayingVisitor.Operation(cd, opId, opType);
+    }
+
+    public ReplayingVisitor visitor(VisitExecutor executor)
+    {
+        return new ReplayingVisitor(executor)
+        {
+            public Visit getVisit(long lts)
+            {
+                assert log.size() > lts : String.format("Log: %s, lts: %d", log, lts);
+                return log.get((int) lts);
+            }
+
+            public void replayAll(Run run)
+            {
+                long maxLts = HistoryBuilder.this.lts;
+                while (true)
+                {
+                    long lts = run.clock.currentLts();
+                    if (lts >= maxLts)
+                        return;
+                    visit(lts);
+                    run.clock.nextLts();
+                }
+            }
+        };
+    }
+}
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index 3dd4a25..7e5fc17 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -82,6 +82,8 @@ public interface OpSelectors
 
         long lts(long rts);
 
+        long currentLts();
+
         long nextLts();
 
         long maxLts();
@@ -122,6 +124,7 @@ public interface OpSelectors
 
         public abstract long minLtsFor(long pd);
 
+        // TODO: right now, we can only calculate a position for 64-bit (in other words, full entropy) pds
         public abstract long positionFor(long lts);
     }
 
@@ -177,21 +180,22 @@ public interface OpSelectors
         @VisibleForTesting
         protected abstract long vd(long pd, long cd, long lts, long opId, int col);
 
-        public long[] vds(long pd, long cd, long lts, long opId, SchemaSpec schema)
+        public long[] vds(long pd, long cd, long lts, long opId, OperationKind opType, SchemaSpec schema)
         {
-            BitSet setColumns = columnMask(pd, lts, opId);
+            BitSet setColumns = columnMask(pd, lts, opId, opType);
             return descriptors(pd, cd, lts, opId, schema.regularColumns, schema.regularColumnsMask(), setColumns, schema.regularColumnsOffset);
         }
 
-        public long[] sds(long pd, long cd, long lts, long opId, SchemaSpec schema)
+        public long[] sds(long pd, long cd, long lts, long opId, OperationKind opType, SchemaSpec schema)
         {
-            BitSet setColumns = columnMask(pd, lts, opId);
+            BitSet setColumns = columnMask(pd, lts, opId, opType);
             return descriptors(pd, cd, lts, opId, schema.staticColumns, schema.staticColumnsMask(), setColumns, schema.staticColumnsOffset);
         }
 
         private long[] descriptors(long pd, long cd, long lts, long opId, List<ColumnSpec<?>> columns, BitSet mask, BitSet setColumns, int offset)
         {
-            assert opId < opsPerModification(lts) * numberOfModifications(lts) : String.format("Operation id %d exceeds the maximum expected number of operations %d", opId, opsPerModification(lts) * numberOfModifications(lts));
+            assert opId < opsPerModification(lts) * numberOfModifications(lts) : String.format("Operation id %d exceeds the maximum expected number of operations %d (%d * %d)",
+                                                                                               opId, opsPerModification(lts) * numberOfModifications(lts), opsPerModification(lts), numberOfModifications(lts));
             long[] descriptors = new long[columns.size()];
 
             for (int i = 0; i < descriptors.length; i++)
@@ -216,7 +220,7 @@ public interface OpSelectors
 
         public abstract OperationKind operationType(long pd, long lts, long opId);
 
-        public abstract BitSet columnMask(long pd, long lts, long opId);
+        public abstract BitSet columnMask(long pd, long lts, long opId, OperationKind opType);
 
         // TODO: why is this one unused?
         public abstract long rowId(long pd, long lts, long cd);
@@ -671,8 +675,7 @@ public interface OpSelectors
 
         public OperationKind operationType(long pd, long lts, long opId)
         {
-            OperationKind kind = operationType(pd, lts, opId, partitionLevelOperationsMask(pd, lts));
-            return kind;
+            return operationType(pd, lts, opId, partitionLevelOperationsMask(pd, lts));
         }
 
         // TODO: create this bitset once per lts
@@ -687,16 +690,16 @@ public interface OpSelectors
             return BitSet.create(partitionLevelOpsMask, totalOps);
         }
 
-        public OperationKind operationType(long pd, long lts, long opId, BitSet partitionLevelOperationsMask)
+        private OperationKind operationType(long pd, long lts, long opId, BitSet partitionLevelOperationsMask)
         {
             long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
             return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId));
         }
 
-        public BitSet columnMask(long pd, long lts, long opId)
+        public BitSet columnMask(long pd, long lts, long opId, OperationKind opType)
         {
             long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM);
-            return columnSelector.columnMask(operationType(pd, lts, opId), descriptor);
+            return columnSelector.columnMask(opType, descriptor);
         }
 
         public long vd(long pd, long cd, long lts, long opId, int col)
diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java
index d17d6f4..a630df2 100644
--- a/harry-core/src/harry/model/QuiescentChecker.java
+++ b/harry-core/src/harry/model/QuiescentChecker.java
@@ -45,10 +45,14 @@ public class QuiescentChecker implements Model
 
     public QuiescentChecker(Run run)
     {
+        this(run, new Reconciler(run));
+    }
+
+    public QuiescentChecker(Run run, Reconciler reconciler)
+    {
         this.clock = run.clock;
         this.sut = run.sut;
-
-        this.reconciler = new Reconciler(run);
+        this.reconciler = reconciler;
         this.tracker = run.tracker;
         this.schemaSpec = run.schemaSpec;
     }
@@ -63,8 +67,10 @@ public class QuiescentChecker implements Model
         long maxCompeteLts = tracker.maxConsecutiveFinished();
         long maxSeenLts = tracker.maxStarted();
 
-        assert maxCompeteLts == maxSeenLts : "Runner hasn't settled down yet. " +
-                                             "Quiescent model can't be reliably used in such cases.";
+        assert maxCompeteLts == maxSeenLts : String.format("Runner hasn't settled down yet. " +
+                                                           "Quiescent model can't be reliably used in such cases. " +
+                                                           "Max complete: %d. Max seen: %d",
+                                                           maxCompeteLts, maxSeenLts);
 
         List<ResultSetRow> actualRows = rowsSupplier.get();
         Iterator<ResultSetRow> actual = actualRows.iterator();
diff --git a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
index 1a64500..186a618 100644
--- a/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
+++ b/harry-core/src/harry/model/clock/ApproximateMonotonicClock.java
@@ -151,6 +151,11 @@ public class ApproximateMonotonicClock implements OpSelectors.MonotonicClock
             throw new IllegalStateException("No thread should have changed LTS during rebase. " + lts.get());
     }
 
+    public long currentLts()
+    {
+        return lts.get();
+    }
+
     public long nextLts()
     {
         long current = lts.get();
diff --git a/harry-core/src/harry/model/clock/OffsetClock.java b/harry-core/src/harry/model/clock/OffsetClock.java
index 8c25394..6040125 100644
--- a/harry-core/src/harry/model/clock/OffsetClock.java
+++ b/harry-core/src/harry/model/clock/OffsetClock.java
@@ -47,6 +47,11 @@ public class OffsetClock implements OpSelectors.MonotonicClock
         return rts - base;
     }
 
+    public long currentLts()
+    {
+        return lts.get();
+    }
+
     public long nextLts()
     {
         return lts.getAndIncrement();
diff --git a/harry-core/src/harry/operations/WriteHelper.java b/harry-core/src/harry/operations/WriteHelper.java
index 084f931..1fdf591 100644
--- a/harry-core/src/harry/operations/WriteHelper.java
+++ b/harry-core/src/harry/operations/WriteHelper.java
@@ -65,7 +65,8 @@ public class WriteHelper
         }
 
         b.append(") USING TIMESTAMP ")
-         .append(timestamp);
+         .append(timestamp)
+         .append(";");
 
         return new CompiledStatement(b.toString(), adjustArraySize(bindings, bindingsCount));
     }
diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java
index da2daa7..275b7c7 100644
--- a/harry-core/src/harry/reconciler/Reconciler.java
+++ b/harry-core/src/harry/reconciler/Reconciler.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,12 +35,14 @@ import harry.core.Run;
 import harry.ddl.ColumnSpec;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
-import harry.visitors.AbstractPartitionVisitor;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 import harry.util.BitSet;
 import harry.util.Ranges;
+import harry.visitors.ReplayingVisitor;
+import harry.visitors.VisitExecutor;
 
 import static harry.generators.DataGenerators.NIL_DESCR;
 import static harry.generators.DataGenerators.UNSET_DESCR;
@@ -64,12 +67,22 @@ public class Reconciler
     private final QueryGenerator rangeSelector;
     private final SchemaSpec schema;
 
+    private final Function<VisitExecutor, Visitor> visitorFactory;
+
     public Reconciler(Run run)
     {
+        this(run,
+             (processor) -> new GeneratingVisitor(run, processor));
+    }
+
+    public Reconciler(Run run,
+                      Function<VisitExecutor, Visitor> visitorFactory)
+    {
         this.descriptorSelector = run.descriptorSelector;
         this.pdSelector = run.pdSelector;
         this.schema = run.schemaSpec;
         this.rangeSelector = run.rangeSelector;
+        this.visitorFactory = visitorFactory;
     }
 
     private final long debugCd = Long.getLong("harry.reconciler.debug_cd", -1L);
@@ -78,26 +91,20 @@ public class Reconciler
     {
         PartitionState partitionState = new PartitionState();
 
-        class Processor extends AbstractPartitionVisitor
+        class Processor implements VisitExecutor
         {
-            public Processor(OpSelectors.PdSelector pdSelector, OpSelectors.DescriptorSelector descriptorSelector, SchemaSpec schema)
-            {
-                super(pdSelector, descriptorSelector, schema);
-            }
-
             // Whether or not a partition deletion was encountered on this LTS.
             private boolean hadPartitionDeletion = false;
             private final List<Ranges.Range> rangeDeletes = new ArrayList<>();
-            private final List<Long> writes = new ArrayList<>();
-            private final List<Long> columnDeletes = new ArrayList<>();
+            private final List<ReplayingVisitor.Operation> writes = new ArrayList<>();
+            private final List<ReplayingVisitor.Operation> columnDeletes = new ArrayList<>();
 
             @Override
-            public void operation(long lts, long pd, long cd, long m, long opId)
+            public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
             {
                 if (hadPartitionDeletion)
                     return;
 
-                OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
                 switch (opType)
                 {
                     case DELETE_RANGE:
@@ -131,11 +138,12 @@ public class Reconciler
                     case UPDATE_WITH_STATICS:
                         if (debugCd != -1 && cd == debugCd)
                             logger.info("Writing {} ({}) at {}/{}", cd, opType, lts, opId);
-                        writes.add(opId);
+                        // TODO: switch to Operation as an entity that can just be passed here
+                        writes.add(new ReplayingVisitor.Operation(cd, opId, opType));
                         break;
                     case DELETE_COLUMN_WITH_STATICS:
                     case DELETE_COLUMN:
-                        columnDeletes.add(opId);
+                        columnDeletes.add(new ReplayingVisitor.Operation(cd, opId, opType));
                         break;
                     default:
                         throw new IllegalStateException();
@@ -143,7 +151,7 @@ public class Reconciler
             }
 
             @Override
-            protected void beforeLts(long lts, long pd)
+            public void beforeLts(long lts, long pd)
             {
                 rangeDeletes.clear();
                 writes.clear();
@@ -152,25 +160,23 @@ public class Reconciler
             }
 
             @Override
-            protected void afterLts(long lts, long pd)
+            public void afterLts(long lts, long pd)
             {
                 if (hadPartitionDeletion)
                     return;
 
-                outer: for (Long opIdBoxed : writes)
+                outer: for (ReplayingVisitor.Operation op : writes)
                 {
-                    long opId = opIdBoxed;
-                    long cd = descriptorSelector.cd(pd, lts, opId, schema);
+                    long opId = op.opId;
+                    long cd = op.cd;
 
-                    OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
-
-                    switch (opType)
+                    switch (op.opType)
                     {
                         case INSERT_WITH_STATICS:
                         case UPDATE_WITH_STATICS:
                             // We could apply static columns during the first iteration, but it's more convenient
                             // to reconcile static-level deletions.
-                            partitionState.writeStaticRow(descriptorSelector.sds(pd, cd, lts, opId, schema),
+                            partitionState.writeStaticRow(descriptorSelector.sds(pd, cd, lts, opId, op.opType, schema),
                                                           lts);
                         case INSERT:
                         case UPDATE:
@@ -192,28 +198,26 @@ public class Reconciler
                             }
 
                             partitionState.write(cd,
-                                                 descriptorSelector.vds(pd, cd, lts, opId, schema),
+                                                 descriptorSelector.vds(pd, cd, lts, opId, op.opType, schema),
                                                  lts,
-                                                 opType == OpSelectors.OperationKind.INSERT || opType == OpSelectors.OperationKind.INSERT_WITH_STATICS);
+                                                 op.opType == OpSelectors.OperationKind.INSERT || op.opType == OpSelectors.OperationKind.INSERT_WITH_STATICS);
                             break;
                         default:
-                            throw new IllegalStateException();
+                            throw new IllegalStateException(op.opType.toString());
                     }
                 }
 
-                outer: for (Long opIdBoxed : columnDeletes)
+                outer: for (ReplayingVisitor.Operation op : columnDeletes)
                 {
-                    long opId = opIdBoxed;
-                    long cd = descriptorSelector.cd(pd, lts, opId, schema);
-
-                    OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
+                    long opId = op.opId;
+                    long cd = op.cd;
 
-                    switch (opType)
+                    switch (op.opType)
                     {
                         case DELETE_COLUMN_WITH_STATICS:
                             partitionState.deleteStaticColumns(lts,
                                                                schema.staticColumnsOffset,
-                                                               descriptorSelector.columnMask(pd, lts, opId),
+                                                               descriptorSelector.columnMask(pd, lts, opId, op.opType),
                                                                schema.staticColumnsMask());
                         case DELETE_COLUMN:
                             if (!query.match(cd))
@@ -236,21 +240,27 @@ public class Reconciler
                             partitionState.deleteRegularColumns(lts,
                                                                 cd,
                                                                 schema.regularColumnsOffset,
-                                                                descriptorSelector.columnMask(pd, lts, opId),
+                                                                descriptorSelector.columnMask(pd, lts, opId, op.opType),
                                                                 schema.regularColumnsMask());
                             break;
                     }
                 }
             }
+
+            @Override
+            public void afterBatch(long lts, long pd, long m) {}
+
+            @Override
+            public void beforeBatch(long lts, long pd, long m) {}
         }
 
-        PartitionVisitor partitionVisitor = new Processor(pdSelector, descriptorSelector, schema);
+        Visitor visitor = visitorFactory.apply(new Processor());
 
         long currentLts = pdSelector.minLtsFor(pd);
 
         while (currentLts <= maxLts && currentLts >= 0)
         {
-            partitionVisitor.visitPartition(currentLts);
+            visitor.visit(currentLts);
             currentLts = pdSelector.nextLts(currentLts);
         }
 
@@ -283,9 +293,9 @@ public class Reconciler
         private void write(long cd,
                            long[] vds,
                            long lts,
-                           boolean writeParimaryKeyLiveness)
+                           boolean writePrimaryKeyLiveness)
         {
-            rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writeParimaryKeyLiveness));
+            rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writePrimaryKeyLiveness));
         }
 
         private void delete(Ranges.Range range,
diff --git a/harry-core/src/harry/runner/DataTracker.java b/harry-core/src/harry/runner/DataTracker.java
index dab6fc9..9f518d1 100644
--- a/harry-core/src/harry/runner/DataTracker.java
+++ b/harry-core/src/harry/runner/DataTracker.java
@@ -18,8 +18,6 @@
 
 package harry.runner;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 import harry.core.Configuration;
 
 public interface DataTracker
diff --git a/harry-core/src/harry/runner/DefaultDataTracker.java b/harry-core/src/harry/runner/DefaultDataTracker.java
index 7b1412a..d5cdb4b 100644
--- a/harry-core/src/harry/runner/DefaultDataTracker.java
+++ b/harry-core/src/harry/runner/DefaultDataTracker.java
@@ -63,7 +63,10 @@ public class DefaultDataTracker implements DataTracker
     private void recordEvent(long lts, boolean finished)
     {
         // all seen LTS are allowed to be "in-flight"
-        maxSeenLts.getAndUpdate((old) -> Math.max(lts, old));
+        maxSeenLts.getAndUpdate((old) -> {
+            assert finished || lts > old : String.format("Attempting to reuse lts: %d. Max seen: %d", lts, old);
+            return Math.max(lts, old);
+        });
 
         if (!finished)
             return;
diff --git a/harry-core/src/harry/runner/Runner.java b/harry-core/src/harry/runner/Runner.java
index b172723..3a84d3e 100644
--- a/harry-core/src/harry/runner/Runner.java
+++ b/harry-core/src/harry/runner/Runner.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import harry.core.Configuration;
 import harry.core.Run;
 import harry.model.OpSelectors;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.Visitor;
 
 
 public abstract class Runner
@@ -140,21 +140,21 @@ public abstract class Runner
     {
         private final ScheduledExecutorService executor;
         private final ScheduledExecutorService shutdownExceutor;
-        private final List<PartitionVisitor> partitionVisitors;
+        private final List<Visitor> visitors;
         private final Configuration config;
 
         public SequentialRunner(Run run,
                                 Configuration config,
-                                List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
+                                List<? extends Visitor.VisitorFactory> visitorFactories)
         {
             super(run, config);
 
             this.executor = Executors.newSingleThreadScheduledExecutor();
             this.shutdownExceutor = Executors.newSingleThreadScheduledExecutor();
             this.config = config;
-            this.partitionVisitors = new ArrayList<>();
-            for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
-                partitionVisitors.add(factory.make(run));
+            this.visitors = new ArrayList<>();
+            for (Visitor.VisitorFactory factory : visitorFactories)
+                visitors.add(factory.make(run));
         }
 
         public CompletableFuture<?> initAndStartAll()
@@ -173,7 +173,7 @@ public abstract class Runner
             executor.submit(reportThrowable(() -> {
                                                 try
                                                 {
-                                                    SequentialRunner.run(partitionVisitors, run.clock, future,
+                                                    SequentialRunner.run(visitors, run.clock, future,
                                                                          () -> Thread.currentThread().isInterrupted() || future.isDone() || completed.get());
                                                 }
                                                 catch (Throwable t)
@@ -186,7 +186,7 @@ public abstract class Runner
             return future;
         }
 
-        static void run(List<PartitionVisitor> visitors,
+        static void run(List<Visitor> visitors,
                         OpSelectors.MonotonicClock clock,
                         CompletableFuture<?> future,
                         BooleanSupplier exitCondition)
@@ -202,8 +202,8 @@ public abstract class Runner
                 {
                     try
                     {
-                        PartitionVisitor partitionVisitor = visitors.get(i);
-                        partitionVisitor.visitPartition(lts);
+                        Visitor visitor = visitors.get(i);
+                        visitor.visit(lts);
                     }
                     catch (Throwable t)
                     {
@@ -238,8 +238,8 @@ public abstract class Runner
     {
         private final ScheduledExecutorService executor;
         private final ScheduledExecutorService shutdownExecutor;
-        private final List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories;
-        private final List<PartitionVisitor> allVisitors;
+        private final List<? extends Visitor.VisitorFactory> visitorFactories;
+        private final List<Visitor> allVisitors;
 
         private final int concurrency;
         private final long runTime;
@@ -248,7 +248,7 @@ public abstract class Runner
         public ConcurrentRunner(Run run,
                                 Configuration config,
                                 int concurrency,
-                                List<? extends PartitionVisitor.PartitionVisitorFactory> partitionVisitorFactories)
+                                List<? extends Visitor.VisitorFactory> visitorFactories)
         {
             super(run, config);
             this.concurrency = concurrency;
@@ -257,7 +257,7 @@ public abstract class Runner
             // TODO: configure concurrency
             this.executor = Executors.newScheduledThreadPool(concurrency);
             this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor();
-            this.partitionVisitorFactories = partitionVisitorFactories;
+            this.visitorFactories = visitorFactories;
             this.allVisitors = new CopyOnWriteArrayList<>();
         }
 
@@ -276,13 +276,13 @@ public abstract class Runner
             BooleanSupplier exitCondition = () -> Thread.currentThread().isInterrupted() || future.isDone();
             for (int i = 0; i < concurrency; i++)
             {
-                List<PartitionVisitor> partitionVisitors = new ArrayList<>();
+                List<Visitor> visitors = new ArrayList<>();
                 executor.submit(reportThrowable(() -> {
-                                                    for (PartitionVisitor.PartitionVisitorFactory factory : partitionVisitorFactories)
-                                                        partitionVisitors.add(factory.make(run));
+                                                    for (Visitor.VisitorFactory factory : visitorFactories)
+                                                        visitors.add(factory.make(run));
 
-                                                    allVisitors.addAll(partitionVisitors);
-                                                    run(partitionVisitors, run.clock, exitCondition);
+                                                    allVisitors.addAll(visitors);
+                                                    run(visitors, run.clock, exitCondition);
                                                 },
                                                 future));
 
@@ -291,22 +291,22 @@ public abstract class Runner
             return future;
         }
 
-        void run(List<PartitionVisitor> visitors,
+        void run(List<Visitor> visitors,
                  OpSelectors.MonotonicClock clock,
                  BooleanSupplier exitCondition)
         {
             while (!exitCondition.getAsBoolean())
             {
                 long lts = clock.nextLts();
-                for (PartitionVisitor visitor : visitors)
-                    visitor.visitPartition(lts);
+                for (Visitor visitor : visitors)
+                    visitor.visit(lts);
             }
         }
 
         public void shutdown() throws InterruptedException
         {
             logger.info("Shutting down...");
-            for (PartitionVisitor visitor : allVisitors)
+            for (Visitor visitor : allVisitors)
                 visitor.shutdown();
 
             shutdownExecutor.shutdownNow();
diff --git a/harry-core/src/harry/util/TestRunner.java b/harry-core/src/harry/util/TestRunner.java
index 4349fd2..cd0fd0e 100644
--- a/harry-core/src/harry/util/TestRunner.java
+++ b/harry-core/src/harry/util/TestRunner.java
@@ -19,8 +19,10 @@
 package harry.util;
 
 import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import harry.generators.Generator;
 import harry.generators.RandomGenerator;
@@ -59,9 +61,46 @@ public class TestRunner
         }
     }
 
+    public static <VISIT, MODEL, SUT> void test(Generator<VISIT> visitGenerator,
+                                                Supplier<MODEL> initializeModel,
+                                                Supplier<SUT> initializeSUT,
+                                                BiFunction<MODEL, VISIT, MODEL> applyToModel,
+                                                BiFunction<SUT, VISIT, SUT> applyToSut,
+                                                ThrowingBiConsumer<MODEL, SUT> afterAll) throws Throwable
+    {
+        MODEL model = initializeModel.get();
+        SUT sut = initializeSUT.get();
+        for (int i = 0; i < CYCLES; i++)
+        {
+            VISIT v = visitGenerator.generate(rand);
+            model = applyToModel.apply(model, v);
+            sut = applyToSut.apply(sut, v);
+        }
+        afterAll.accept(model, sut);
+    }
+
+    public static <VISIT, SUT> void test(Generator<VISIT> visitGenerator,
+                                         Supplier<SUT> initializeSUT,
+                                         BiFunction<SUT, VISIT, SUT> applyToSut,
+                                         Consumer<SUT> afterAll) throws Throwable
+    {
+        SUT sut = initializeSUT.get();
+        for (int i = 0; i < CYCLES; i++)
+        {
+            VISIT v = visitGenerator.generate(rand);
+            sut = applyToSut.apply(sut, v);
+        }
+        afterAll.accept(sut);
+    }
+
     public static interface ThrowingConsumer<T>
     {
         void accept(T t) throws Throwable;
     }
+
+    public static interface ThrowingBiConsumer<T1, T2>
+    {
+        void accept(T1 t1, T2 t2) throws Throwable;
+    }
 }
 
diff --git a/harry-core/src/harry/visitors/AllPartitionsValidator.java b/harry-core/src/harry/visitors/AllPartitionsValidator.java
index 5a5a1c7..1b67a8e 100644
--- a/harry-core/src/harry/visitors/AllPartitionsValidator.java
+++ b/harry-core/src/harry/visitors/AllPartitionsValidator.java
@@ -38,7 +38,7 @@ import harry.operations.Query;
 
 // This might be something that potentially grows into the validator described in the design doc;
 // right now it's just a helper/container class
-public class AllPartitionsValidator implements PartitionVisitor
+public class AllPartitionsValidator implements Visitor
 {
     private static final Logger logger = LoggerFactory.getLogger(AllPartitionsValidator.class);
 
@@ -111,7 +111,7 @@ public class AllPartitionsValidator implements PartitionVisitor
 
     private final AtomicLong maxPos = new AtomicLong(-1);
 
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
 
diff --git a/harry-core/src/harry/visitors/CorruptingPartitionVisitor.java b/harry-core/src/harry/visitors/CorruptingVisitor.java
similarity index 94%
rename from harry-core/src/harry/visitors/CorruptingPartitionVisitor.java
rename to harry-core/src/harry/visitors/CorruptingVisitor.java
index 71c425b..26febeb 100644
--- a/harry-core/src/harry/visitors/CorruptingPartitionVisitor.java
+++ b/harry-core/src/harry/visitors/CorruptingVisitor.java
@@ -32,7 +32,7 @@ import harry.corruptor.QueryResponseCorruptor;
 import harry.runner.HarryRunner;
 import harry.operations.Query;
 
-public class CorruptingPartitionVisitor implements PartitionVisitor
+public class CorruptingVisitor implements Visitor
 {
     public static final Logger logger = LoggerFactory.getLogger(HarryRunner.class);
 
@@ -40,8 +40,8 @@ public class CorruptingPartitionVisitor implements PartitionVisitor
     private final QueryResponseCorruptor[] corruptors;
     private final int triggerAfter;
 
-    public CorruptingPartitionVisitor(int triggerAfter,
-                                      Run run)
+    public CorruptingVisitor(int triggerAfter,
+                             Run run)
     {
         this.run = run;
         this.triggerAfter = triggerAfter;
@@ -64,7 +64,7 @@ public class CorruptingPartitionVisitor implements PartitionVisitor
 
     private final AtomicLong maxPos = new AtomicLong(-1);
 
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
diff --git a/harry-core/src/harry/visitors/DelegatingVisitor.java b/harry-core/src/harry/visitors/DelegatingVisitor.java
new file mode 100644
index 0000000..f4994b1
--- /dev/null
+++ b/harry-core/src/harry/visitors/DelegatingVisitor.java
@@ -0,0 +1,61 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.visitors;
+
+import harry.model.OpSelectors;
+
+public abstract class DelegatingVisitor implements Visitor, VisitExecutor
+{
+    protected VisitExecutor delegate;
+
+    public DelegatingVisitor(VisitExecutor delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    public void beforeLts(long lts, long pd)
+    {
+        delegate.beforeLts(lts, pd);
+    }
+
+    public void afterLts(long lts, long pd)
+    {
+        delegate.afterLts(lts, pd);
+    }
+
+    public void beforeBatch(long lts, long pd, long m)
+    {
+        delegate.beforeBatch(lts, pd, m);
+    }
+
+    public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+    {
+        delegate.operation(lts, pd, cd, m, opId, opType);
+    }
+
+    public void afterBatch(long lts, long pd, long m)
+    {
+        delegate.afterBatch(lts, pd, m);
+    }
+
+    public void shutdown() throws InterruptedException
+    {
+        delegate.shutdown();
+    }
+}
diff --git a/harry-core/src/harry/visitors/AbstractPartitionVisitor.java b/harry-core/src/harry/visitors/GeneratingVisitor.java
similarity index 52%
rename from harry-core/src/harry/visitors/AbstractPartitionVisitor.java
rename to harry-core/src/harry/visitors/GeneratingVisitor.java
index 3f58e3a..78e2b07 100644
--- a/harry-core/src/harry/visitors/AbstractPartitionVisitor.java
+++ b/harry-core/src/harry/visitors/GeneratingVisitor.java
@@ -18,35 +18,32 @@
 
 package harry.visitors;
 
+import harry.core.Run;
 import harry.ddl.SchemaSpec;
 import harry.model.OpSelectors;
 
-public abstract class AbstractPartitionVisitor implements PartitionVisitor
+public class GeneratingVisitor extends DelegatingVisitor
 {
-    protected final OpSelectors.PdSelector pdSelector;
-    protected final OpSelectors.DescriptorSelector descriptorSelector;
-    protected final SchemaSpec schema;
+    private final OpSelectors.PdSelector pdSelector;
+    private final OpSelectors.DescriptorSelector descriptorSelector;
+    private final SchemaSpec schema;
 
-    public AbstractPartitionVisitor(AbstractPartitionVisitor visitor)
+    public GeneratingVisitor(Run run,
+                             VisitExecutor delegate)
     {
-        this(visitor.pdSelector, visitor.descriptorSelector, visitor.schema);
+        super(delegate);
+        this.pdSelector = run.pdSelector;
+        this.descriptorSelector = run.descriptorSelector;
+        this.schema = run.schemaSpec;
     }
 
-    public AbstractPartitionVisitor(OpSelectors.PdSelector pdSelector,
-                                    OpSelectors.DescriptorSelector descriptorSelector,
-                                    SchemaSpec schema)
+    @Override
+    public void visit(long lts)
     {
-        this.pdSelector = pdSelector;
-        this.descriptorSelector = descriptorSelector;
-        this.schema = schema;
+        generate(lts, pdSelector.pd(lts, schema));
     }
 
-    public void visitPartition(long lts)
-    {
-        visitPartition(lts, pdSelector.pd(lts, schema));
-    }
-
-    private void visitPartition(long lts, long pd)
+    private void generate(long lts, long pd)
     {
         beforeLts(lts, pd);
 
@@ -60,36 +57,12 @@ public abstract class AbstractPartitionVisitor implements PartitionVisitor
             {
                 long opId = m * opsPerModification + i;
                 long cd = descriptorSelector.cd(pd, lts, opId, schema);
-                operation(lts, pd, cd, m, opId);
+                OpSelectors.OperationKind opType = descriptorSelector.operationType(pd, lts, opId);
+                operation(lts, pd, cd, m, opId, opType);
             }
             afterBatch(lts, pd, m);
         }
 
         afterLts(lts, pd);
     }
-
-    protected void beforeLts(long lts, long pd)
-    {
-    }
-
-    protected void afterLts(long lts, long pd)
-    {
-    }
-
-    protected void beforeBatch(long lts, long pd, long m)
-    {
-    }
-
-    protected void operation(long lts, long pd, long cd, long m, long opId)
-    {
-
-    }
-
-    protected void afterBatch(long lts, long pd, long m)
-    {
-    }
-
-    public void shutdown() throws InterruptedException
-    {
-    }
-}
\ No newline at end of file
+}
diff --git a/harry-core/src/harry/visitors/LoggingPartitionVisitor.java b/harry-core/src/harry/visitors/LoggingPartitionVisitor.java
deleted file mode 100644
index 3e97ba3..0000000
--- a/harry-core/src/harry/visitors/LoggingPartitionVisitor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.visitors;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
-import harry.core.Run;
-import harry.operations.CompiledStatement;
-
-public class LoggingPartitionVisitor extends MutatingPartitionVisitor
-{
-    private final BufferedWriter operationLog;
-
-    public LoggingPartitionVisitor(Run run, Operation.RowVisitorFactory rowVisitorFactory)
-    {
-        super(run, rowVisitorFactory);
-
-        File f = new File("operation.log");
-        try
-        {
-            operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void afterLts(long lts, long pd)
-    {
-        super.afterLts(lts, pd);
-        log("LTS: %d. Pd %d. Finished\n", lts, pd);
-    }
-
-    @Override
-    protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
-    {
-        CompiledStatement statement = super.operationInternal(lts, pd, cd, m, opId);
-
-        log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
-                          lts, pd, cd, m, opId, statement));
-
-        return statement;
-    }
-
-    private void log(String format, Object... objects)
-    {
-        try
-        {
-            operationLog.write(String.format(format, objects));
-            operationLog.flush();
-        }
-        catch (IOException e)
-        {
-            // ignore
-        }
-    }
-}
diff --git a/harry-core/src/harry/visitors/LoggingVisitor.java b/harry-core/src/harry/visitors/LoggingVisitor.java
new file mode 100644
index 0000000..8deebde
--- /dev/null
+++ b/harry-core/src/harry/visitors/LoggingVisitor.java
@@ -0,0 +1,90 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.visitors;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+
+import harry.core.Run;
+import harry.model.OpSelectors;
+import harry.operations.CompiledStatement;
+
+public class LoggingVisitor extends GeneratingVisitor
+{
+
+    public LoggingVisitor(Run run,
+                          OperationExecutor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run, new LoggingVisitorExecutor(run, rowVisitorFactory.make(run)));
+    }
+
+    public static class LoggingVisitorExecutor extends MutatingVisitor.MutatingVisitExecutor
+    {
+        private final BufferedWriter operationLog;
+
+        public LoggingVisitorExecutor(Run run, OperationExecutor rowVisitor)
+        {
+            super(run, rowVisitor);
+
+            File f = new File("operation.log");
+            try
+            {
+                operationLog = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
+            }
+            catch (FileNotFoundException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void afterLts(long lts, long pd)
+        {
+            super.afterLts(lts, pd);
+            log("LTS: %d. Pd %d. Finished\n", lts, pd);
+        }
+
+        @Override
+        protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+        {
+            CompiledStatement statement = super.operationInternal(lts, pd, cd, m, opId, opType);
+
+            log(String.format("LTS: %d. Pd %d. Cd %d. M %d. OpId: %d Statement %s\n",
+                              lts, pd, cd, m, opId, statement));
+
+            return statement;
+        }
+
+        private void log(String format, Object... objects)
+        {
+            try
+            {
+                operationLog.write(String.format(format, objects));
+                operationLog.flush();
+            }
+            catch (IOException e)
+            {
+                // ignore
+            }
+        }
+    }
+}
diff --git a/harry-core/src/harry/visitors/MutatingPartitionVisitor.java b/harry-core/src/harry/visitors/MutatingPartitionVisitor.java
deleted file mode 100644
index 88456fe..0000000
--- a/harry-core/src/harry/visitors/MutatingPartitionVisitor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package harry.visitors;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import harry.core.Run;
-import harry.model.OpSelectors;
-import harry.model.sut.SystemUnderTest;
-import harry.operations.CompiledStatement;
-import harry.runner.DataTracker;
-
-public class MutatingPartitionVisitor extends AbstractPartitionVisitor
-{
-    private static final Logger logger = LoggerFactory.getLogger(MutatingPartitionVisitor.class);
-
-    private final List<String> statements = new ArrayList<>();
-    private final List<Object> bindings = new ArrayList<>();
-
-    private final List<CompletableFuture<?>> futures = new ArrayList<>();
-
-    protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
-    protected final DataTracker tracker;
-    protected final SystemUnderTest sut;
-    protected final Operation rowVisitor;
-
-    public MutatingPartitionVisitor(Run run, Operation.RowVisitorFactory rowVisitorFactory)
-    {
-        super(run.pdSelector, run.descriptorSelector, run.schemaSpec);
-        this.tracker = run.tracker;
-        this.sut = run.sut;
-        this.rowVisitor = rowVisitorFactory.make(run);
-    }
-
-    public void beforeLts(long lts, long pd)
-    {
-        tracker.started(lts);
-    }
-
-    public void afterLts(long lts, long pd)
-    {
-        for (CompletableFuture<?> future : futures)
-        {
-            try
-            {
-                future.get();
-            }
-            catch (Throwable t)
-            {
-                throw new IllegalStateException("Couldn't repeat operations within timeout bounds.", t);
-            }
-        }
-        futures.clear();
-        tracker.finished(lts);
-    }
-
-    public void beforeBatch(long lts, long pd, long m)
-    {
-        statements.clear();
-        bindings.clear();
-    }
-
-    protected void operation(long lts, long pd, long cd, long m, long opId)
-    {
-        CompiledStatement statement = operationInternal(lts, pd, cd, m, opId);
-        statements.add(statement.cql());
-        for (Object binding : statement.bindings())
-            bindings.add(binding);
-    }
-
-    protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId)
-    {
-        OpSelectors.OperationKind op = descriptorSelector.operationType(pd, lts, opId);
-        return rowVisitor.perform(op, lts, pd, cd, opId);
-    }
-
-    public void afterBatch(long lts, long pd, long m)
-    {
-        if (statements.isEmpty())
-        {
-            logger.warn("Encountered an empty batch on {}", lts);
-            return;
-        }
-
-        String query = String.join(" ", statements);
-
-        if (statements.size() > 1)
-            query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
-
-        Object[] bindingsArray = new Object[bindings.size()];
-        bindings.toArray(bindingsArray);
-
-        CompletableFuture<Object[][]> future = new CompletableFuture<>();
-        executeAsyncWithRetries(future, new CompiledStatement(query, bindingsArray));
-        futures.add(future);
-
-        statements.clear();
-        bindings.clear();
-    }
-
-    void executeAsyncWithRetries(CompletableFuture<Object[][]> future, CompiledStatement statement)
-    {
-        if (sut.isShutdown())
-            throw new IllegalStateException("System under test is shut down");
-
-        // TODO: limit a number of retries
-        sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
-           .whenComplete((res, t) -> {
-               if (t != null)
-                   executor.schedule(() -> executeAsyncWithRetries(future, statement), 1, TimeUnit.SECONDS);
-               else
-                   future.complete(res);
-           });
-    }
-
-    public void shutdown() throws InterruptedException
-    {
-        executor.shutdown();
-        executor.awaitTermination(30, TimeUnit.SECONDS);
-    }
-}
diff --git a/harry-core/src/harry/visitors/MutatingRowVisitor.java b/harry-core/src/harry/visitors/MutatingRowVisitor.java
index 0c5db9a..5a1dfbd 100644
--- a/harry-core/src/harry/visitors/MutatingRowVisitor.java
+++ b/harry-core/src/harry/visitors/MutatingRowVisitor.java
@@ -30,7 +30,7 @@ import harry.operations.Query;
 import harry.operations.QueryGenerator;
 import harry.util.BitSet;
 
-public class MutatingRowVisitor implements Operation
+public class MutatingRowVisitor implements OperationExecutor
 {
     protected final SchemaSpec schema;
     protected final OpSelectors.MonotonicClock clock;
@@ -64,37 +64,37 @@ public class MutatingRowVisitor implements Operation
     public CompiledStatement insert(long lts, long pd, long cd, long opId)
     {
         metricReporter.insert();
-        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
+        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT, schema);
         return WriteHelper.inflateInsert(schema, pd, cd, vds, null, clock.rts(lts));
     }
 
     public CompiledStatement insertWithStatics(long lts, long pd, long cd, long opId)
     {
         metricReporter.insert();
-        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
-        long[] sds = descriptorSelector.sds(pd, cd, lts, opId, schema);
+        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT_WITH_STATICS, schema);
+        long[] sds = descriptorSelector.sds(pd, cd, lts, opId, OpSelectors.OperationKind.INSERT_WITH_STATICS, schema);
         return WriteHelper.inflateInsert(schema, pd, cd, vds, sds, clock.rts(lts));
     }
 
     public CompiledStatement update(long lts, long pd, long cd, long opId)
     {
         metricReporter.insert();
-        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
+        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.UPDATE, schema);
         return WriteHelper.inflateUpdate(schema, pd, cd, vds, null, clock.rts(lts));
     }
 
     public CompiledStatement updateWithStatics(long lts, long pd, long cd, long opId)
     {
         metricReporter.insert();
-        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, schema);
-        long[] sds = descriptorSelector.sds(pd, cd, lts, opId, schema);
+        long[] vds = descriptorSelector.vds(pd, cd, lts, opId, OpSelectors.OperationKind.UPDATE_WITH_STATICS, schema);
+        long[] sds = descriptorSelector.sds(pd, cd, lts, opId, OpSelectors.OperationKind.UPDATE_WITH_STATICS, schema);
         return WriteHelper.inflateUpdate(schema, pd, cd, vds, sds, clock.rts(lts));
     }
 
     public CompiledStatement deleteColumn(long lts, long pd, long cd, long opId)
     {
         metricReporter.columnDelete();
-        BitSet columns = descriptorSelector.columnMask(pd, lts, opId);
+        BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN);
         BitSet mask = schema.regularColumnsMask();
         return DeleteHelper.deleteColumn(schema, pd, cd, columns, mask, clock.rts(lts));
     }
@@ -102,7 +102,7 @@ public class MutatingRowVisitor implements Operation
     public CompiledStatement deleteColumnWithStatics(long lts, long pd, long cd, long opId)
     {
         metricReporter.columnDelete();
-        BitSet columns = descriptorSelector.columnMask(pd, lts, opId);
+        BitSet columns = descriptorSelector.columnMask(pd, lts, opId, OpSelectors.OperationKind.DELETE_COLUMN_WITH_STATICS);
         BitSet mask = schema.regularAndStaticColumnsMask();
         return DeleteHelper.deleteColumn(schema, pd, cd, columns, mask, clock.rts(lts));
     }
diff --git a/harry-core/src/harry/visitors/MutatingVisitor.java b/harry-core/src/harry/visitors/MutatingVisitor.java
new file mode 100644
index 0000000..26c7417
--- /dev/null
+++ b/harry-core/src/harry/visitors/MutatingVisitor.java
@@ -0,0 +1,169 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.visitors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import harry.core.Run;
+import harry.model.OpSelectors;
+import harry.model.sut.SystemUnderTest;
+import harry.operations.CompiledStatement;
+import harry.runner.DataTracker;
+
+public class MutatingVisitor extends GeneratingVisitor
+{
+    private static final Logger logger = LoggerFactory.getLogger(MutatingVisitor.class);
+
+    public MutatingVisitor(Run run,
+                           OperationExecutor.RowVisitorFactory rowVisitorFactory)
+    {
+        super(run, new MutatingVisitExecutor(run, rowVisitorFactory.make(run)));
+    }
+
+    public static class MutatingVisitExecutor implements VisitExecutor
+    {
+        private final List<String> statements = new ArrayList<>();
+        private final List<Object> bindings = new ArrayList<>();
+
+        private final List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
+
+        protected final OpSelectors.DescriptorSelector descriptorSelector;
+        protected final DataTracker tracker;
+        protected final SystemUnderTest sut;
+        protected final OperationExecutor rowVisitor;
+        private final int maxRetries = 10;
+
+        public MutatingVisitExecutor(Run run, OperationExecutor rowVisitor)
+        {
+            this.descriptorSelector = run.descriptorSelector;
+            this.tracker = run.tracker;
+            this.sut = run.sut;
+            this.rowVisitor = rowVisitor;
+        }
+
+        @Override
+        public void beforeLts(long lts, long pd)
+        {
+            tracker.started(lts);
+        }
+
+        @Override
+        public void afterLts(long lts, long pd)
+        {
+            for (CompletableFuture<?> future : futures)
+            {
+                try
+                {
+                    future.get();
+                }
+                catch (Throwable t)
+                {
+                    throw new IllegalStateException("Couldn't repeat operations within timeout bounds.", t);
+                }
+            }
+            futures.clear();
+            tracker.finished(lts);
+        }
+
+        @Override
+        public void beforeBatch(long lts, long pd, long m)
+        {
+            statements.clear();
+            bindings.clear();
+        }
+
+        @Override
+        public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+        {
+            CompiledStatement statement = operationInternal(lts, pd, cd, m, opId, opType);
+
+            statements.add(statement.cql());
+            for (Object binding : statement.bindings())
+                bindings.add(binding);
+        }
+
+        protected CompiledStatement operationInternal(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind opType)
+        {
+            return rowVisitor.perform(opType, lts, pd, cd, opId);
+        }
+
+        @Override
+        public void afterBatch(long lts, long pd, long m)
+        {
+            if (statements.isEmpty())
+            {
+                logger.warn("Encountered an empty batch on {}", lts);
+                return;
+            }
+
+            String query = String.join(" ", statements);
+
+            if (statements.size() > 1)
+                query = String.format("BEGIN UNLOGGED BATCH\n%s\nAPPLY BATCH;", query);
+
+            Object[] bindingsArray = new Object[bindings.size()];
+            bindings.toArray(bindingsArray);
+
+            CompletableFuture<Object[][]> future = new CompletableFuture<>();
+            executeAsyncWithRetries(lts, pd, future, new CompiledStatement(query, bindingsArray));
+            futures.add(future);
+
+            statements.clear();
+            bindings.clear();
+        }
+
+        protected void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement)
+        {
+            executeAsyncWithRetries(lts, pd, future, statement, 0);
+        }
+
+        private void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
+        {
+            if (sut.isShutdown())
+                throw new IllegalStateException("System under test is shut down");
+
+            if (retries > this.maxRetries)
+                throw new IllegalStateException(String.format("Can not execute statement %s after %d retries", statement, retries));
+
+            sut.executeAsync(statement.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, statement.bindings())
+               .whenComplete((res, t) -> {
+                   if (t != null)
+                       executor.schedule(() -> executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, TimeUnit.SECONDS);
+                   else
+                       future.complete(res);
+               });
+        }
+
+        public void shutdown() throws InterruptedException
+        {
+            executor.shutdown();
+            executor.awaitTermination(30, TimeUnit.SECONDS);
+        }
+    }
+}
diff --git a/harry-core/src/harry/visitors/Operation.java b/harry-core/src/harry/visitors/OperationExecutor.java
similarity index 97%
rename from harry-core/src/harry/visitors/Operation.java
rename to harry-core/src/harry/visitors/OperationExecutor.java
index 1af21fc..920b0c5 100644
--- a/harry-core/src/harry/visitors/Operation.java
+++ b/harry-core/src/harry/visitors/OperationExecutor.java
@@ -22,11 +22,11 @@ import harry.core.Run;
 import harry.model.OpSelectors;
 import harry.operations.CompiledStatement;
 
-public interface Operation
+public interface OperationExecutor
 {
     interface RowVisitorFactory
     {
-        Operation make(Run run);
+        OperationExecutor make(Run run);
     }
 
     default CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
diff --git a/harry-core/src/harry/visitors/ParallelRecentPartitionValidator.java b/harry-core/src/harry/visitors/ParallelRecentValidator.java
similarity index 80%
rename from harry-core/src/harry/visitors/ParallelRecentPartitionValidator.java
rename to harry-core/src/harry/visitors/ParallelRecentValidator.java
index a1688cd..29503d1 100644
--- a/harry-core/src/harry/visitors/ParallelRecentPartitionValidator.java
+++ b/harry-core/src/harry/visitors/ParallelRecentValidator.java
@@ -41,9 +41,9 @@ import harry.model.Model;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 
-public class ParallelRecentPartitionValidator extends ParallelValidator<ParallelRecentPartitionValidator.State>
+public class ParallelRecentValidator extends ParallelValidator<ParallelRecentValidator.State>
 {
-    private static final Logger logger = LoggerFactory.getLogger(ParallelRecentPartitionValidator.class);
+    private static final Logger logger = LoggerFactory.getLogger(ParallelRecentValidator.class);
 
     private final int partitionCount;
     private final int queries;
@@ -51,9 +51,9 @@ public class ParallelRecentPartitionValidator extends ParallelValidator<Parallel
     private final Model model;
     private final BufferedWriter validationLog;
 
-    public ParallelRecentPartitionValidator(int partitionCount, int concurrency, int triggerAfter,  int queries,
-                                            Run run,
-                                            Model.ModelFactory modelFactory)
+    public ParallelRecentValidator(int partitionCount, int concurrency, int triggerAfter, int queries,
+                                   Run run,
+                                   Model.ModelFactory modelFactory)
     {
         super(concurrency, triggerAfter, run);
         this.partitionCount = partitionCount;
@@ -139,7 +139,7 @@ public class ParallelRecentPartitionValidator extends ParallelValidator<Parallel
     }
 
     @JsonTypeName("parallel_validate_recent_partitions")
-    public static class ParallelRecentPartitionValidatorConfig implements Configuration.PartitionVisitorConfiguration
+    public static class ParallelRecentValidatorConfig implements Configuration.VisitorConfiguration
     {
         public final int partition_count;
         public final int trigger_after;
@@ -149,11 +149,11 @@ public class ParallelRecentPartitionValidator extends ParallelValidator<Parallel
 
         // TODO: make query selector configurable
         @JsonCreator
-        public ParallelRecentPartitionValidatorConfig(@JsonProperty("partition_count") int partition_count,
-                                                      @JsonProperty("concurrency") int concurrency,
-                                                      @JsonProperty("trigger_after") int trigger_after,
-                                                      @JsonProperty("queries_per_partition") int queries,
-                                                      @JsonProperty("model") Configuration.ModelConfiguration model)
+        public ParallelRecentValidatorConfig(@JsonProperty("partition_count") int partition_count,
+                                             @JsonProperty("concurrency") int concurrency,
+                                             @JsonProperty("trigger_after") int trigger_after,
+                                             @JsonProperty("queries_per_partition") int queries,
+                                             @JsonProperty("model") Configuration.ModelConfiguration model)
         {
             this.partition_count = partition_count;
             this.concurrency = concurrency;
@@ -163,9 +163,9 @@ public class ParallelRecentPartitionValidator extends ParallelValidator<Parallel
         }
 
         @Override
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
-            return new ParallelRecentPartitionValidator(partition_count, concurrency, trigger_after, queries, run, modelConfiguration);
+            return new ParallelRecentValidator(partition_count, concurrency, trigger_after, queries, run, modelConfiguration);
         }
     }
 
diff --git a/harry-core/src/harry/visitors/ParallelValidator.java b/harry-core/src/harry/visitors/ParallelValidator.java
index 2964eb7..4772b40 100644
--- a/harry-core/src/harry/visitors/ParallelValidator.java
+++ b/harry-core/src/harry/visitors/ParallelValidator.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import harry.core.Run;
 
-public abstract class ParallelValidator<T extends ParallelValidator.State> implements PartitionVisitor
+public abstract class ParallelValidator<T extends ParallelValidator.State> implements Visitor
 {
     private static final Logger logger = LoggerFactory.getLogger(AllPartitionsValidator.class);
 
@@ -86,7 +86,7 @@ public abstract class ParallelValidator<T extends ParallelValidator.State> imple
         }
     }
 
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
diff --git a/harry-core/src/harry/visitors/RecentPartitionValidator.java b/harry-core/src/harry/visitors/RecentValidator.java
similarity index 91%
rename from harry-core/src/harry/visitors/RecentPartitionValidator.java
rename to harry-core/src/harry/visitors/RecentValidator.java
index 4bb1e61..d0d09fa 100644
--- a/harry-core/src/harry/visitors/RecentPartitionValidator.java
+++ b/harry-core/src/harry/visitors/RecentValidator.java
@@ -37,10 +37,10 @@ import harry.model.OpSelectors;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 
-public class RecentPartitionValidator implements PartitionVisitor
+public class RecentValidator implements Visitor
 {
     private final BufferedWriter validationLog;
-    private static final Logger logger = LoggerFactory.getLogger(RecentPartitionValidator.class);
+    private static final Logger logger = LoggerFactory.getLogger(RecentValidator.class);
     private final Model model;
 
     private final OpSelectors.PdSelector pdSelector;
@@ -50,11 +50,11 @@ public class RecentPartitionValidator implements PartitionVisitor
     private final int triggerAfter;
     private final int queries;
 
-    public RecentPartitionValidator(int partitionCount,
-                                    int queries,
-                                    int triggerAfter,
-                                    Run run,
-                                    Model.ModelFactory modelFactory)
+    public RecentValidator(int partitionCount,
+                           int queries,
+                           int triggerAfter,
+                           Run run,
+                           Model.ModelFactory modelFactory)
     {
         this.partitionCount = partitionCount;
         this.triggerAfter = triggerAfter;
@@ -103,7 +103,7 @@ public class RecentPartitionValidator implements PartitionVisitor
         }
     }
 
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
 
diff --git a/harry-core/src/harry/visitors/ReplayingVisitor.java b/harry-core/src/harry/visitors/ReplayingVisitor.java
new file mode 100644
index 0000000..1979664
--- /dev/null
+++ b/harry-core/src/harry/visitors/ReplayingVisitor.java
@@ -0,0 +1,121 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.visitors;
+
+import java.util.Arrays;
+
+import harry.core.Run;
+import harry.model.OpSelectors;
+
+public abstract class ReplayingVisitor extends DelegatingVisitor
+{
+    public ReplayingVisitor(VisitExecutor delegate)
+    {
+        super(delegate);
+    }
+
+    public void visit(long lts)
+    {
+        replay(getVisit(lts));
+    }
+
+    public abstract Visit getVisit(long lts);
+
+    public abstract void replayAll(Run run);
+    private void replay(Visit visit)
+    {
+        beforeLts(visit.lts, visit.pd);
+
+        for (Batch batch : visit.operations)
+        {
+            beforeBatch(visit.lts, visit.pd, batch.m);
+            for (Operation operation : batch.operations)
+                operation(visit.lts, visit.pd, operation.cd, batch.m, operation.opId, operation.opType);
+            afterBatch(visit.lts, visit.pd, batch.m);
+        }
+
+        afterLts(visit.lts, visit.pd);
+    }
+
+    public static class Visit
+    {
+        public final long lts;
+        public final long pd;
+        public final Batch[] operations;
+
+        public Visit(long lts, long pd, Batch[] operations)
+        {
+            this.lts = lts;
+            this.pd = pd;
+            this.operations = operations;
+        }
+
+        public String toString()
+        {
+            return "Visit{" +
+                   "lts=" + lts +
+                   ", pd=" + pd +
+                   ", operations=[" + Arrays.toString(operations) +
+                                            "]}";
+        }
+    }
+
+    public static class Batch
+    {
+        public final long m;
+        public final Operation[] operations;
+
+        public Batch(long m, Operation[] operations)
+        {
+            this.m = m;
+            this.operations = operations;
+        }
+
+        public String toString()
+        {
+            return "Batch{" +
+                   "m=" + m +
+                   ", operations=[" + Arrays.toString(operations) +
+                   "]}";
+        }
+    }
+
+    public static class Operation
+    {
+        public final long cd;
+        public final long opId;
+        public final OpSelectors.OperationKind opType;
+
+        public Operation(long cd, long opId, OpSelectors.OperationKind opType)
+        {
+            this.cd = cd;
+            this.opId = opId;
+            this.opType = opType;
+        }
+
+        public String toString()
+        {
+            return "Operation{" +
+                   "cd=" + cd +
+                   ", opId=" + opId +
+                   ", opType=" + opType +
+                   '}';
+        }
+    }
+}
\ No newline at end of file
diff --git a/harry-core/src/harry/visitors/Sampler.java b/harry-core/src/harry/visitors/Sampler.java
index e4088d9..20c9a8c 100644
--- a/harry-core/src/harry/visitors/Sampler.java
+++ b/harry-core/src/harry/visitors/Sampler.java
@@ -35,7 +35,7 @@ import harry.model.OpSelectors;
 import harry.model.SelectHelper;
 import harry.model.sut.SystemUnderTest;
 
-public class Sampler implements PartitionVisitor
+public class Sampler implements Visitor
 {
     private static final Logger logger = LoggerFactory.getLogger(AllPartitionsValidator.class);
 
@@ -57,7 +57,7 @@ public class Sampler implements PartitionVisitor
         this.samplePartitions = samplePartitions;
     }
 
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         maxPos.updateAndGet(current -> Math.max(pdSelector.positionFor(lts), current));
 
@@ -89,7 +89,7 @@ public class Sampler implements PartitionVisitor
     }
 
     @JsonTypeName("sampler")
-    public static class SamplerConfiguration implements Configuration.PartitionVisitorConfiguration
+    public static class SamplerConfiguration implements Configuration.VisitorConfiguration
     {
         public final int trigger_after;
         public final int sample_partitions;
@@ -102,7 +102,7 @@ public class Sampler implements PartitionVisitor
             this.sample_partitions = sample_partitions;
         }
 
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
             return new Sampler(run, trigger_after, sample_partitions);
         }
diff --git a/harry-core/src/harry/visitors/SinglePartitionValidator.java b/harry-core/src/harry/visitors/SingleValidator.java
similarity index 86%
rename from harry-core/src/harry/visitors/SinglePartitionValidator.java
rename to harry-core/src/harry/visitors/SingleValidator.java
index 313cff9..0993461 100644
--- a/harry-core/src/harry/visitors/SinglePartitionValidator.java
+++ b/harry-core/src/harry/visitors/SingleValidator.java
@@ -22,17 +22,16 @@ import harry.core.Run;
 import harry.model.Model;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
-import harry.visitors.PartitionVisitor;
 
-public class SinglePartitionValidator implements PartitionVisitor
+public class SingleValidator implements Visitor
 {
     protected final int iterations;
     protected final Model model;
     protected final QueryGenerator queryGenerator;
     protected final Run run;
-    public SinglePartitionValidator(int iterations,
-                                    Run run,
-                                    Model.ModelFactory modelFactory)
+    public SingleValidator(int iterations,
+                           Run run,
+                           Model.ModelFactory modelFactory)
     {
         this.iterations = iterations;
         this.model = modelFactory.make(run);
@@ -45,7 +44,7 @@ public class SinglePartitionValidator implements PartitionVisitor
 
     }
 
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         model.validate(queryGenerator.inflate(lts, 0, Query.QueryKind.SINGLE_PARTITION));
 
diff --git a/harry-core/src/harry/visitors/PartitionVisitor.java b/harry-core/src/harry/visitors/VisitExecutor.java
similarity index 65%
copy from harry-core/src/harry/visitors/PartitionVisitor.java
copy to harry-core/src/harry/visitors/VisitExecutor.java
index 77de711..94aa1fc 100644
--- a/harry-core/src/harry/visitors/PartitionVisitor.java
+++ b/harry-core/src/harry/visitors/VisitExecutor.java
@@ -18,14 +18,19 @@
 
 package harry.visitors;
 
-import harry.core.Run;
+import harry.model.OpSelectors;
 
-public interface PartitionVisitor
+public interface VisitExecutor
 {
-    void visitPartition(long lts);
-    public void shutdown() throws InterruptedException;
-    public interface PartitionVisitorFactory
-    {
-        public PartitionVisitor make(Run run);
-    }
-}
\ No newline at end of file
+    public void beforeLts(long lts, long pd);
+
+    public void afterLts(long lts, long pd);
+
+    public void beforeBatch(long lts, long pd, long m);
+
+    public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind);
+
+    public void afterBatch(long lts, long pd, long m);
+
+    public default void shutdown() throws InterruptedException {}
+}
diff --git a/harry-core/src/harry/visitors/PartitionVisitor.java b/harry-core/src/harry/visitors/Visitor.java
similarity index 80%
copy from harry-core/src/harry/visitors/PartitionVisitor.java
copy to harry-core/src/harry/visitors/Visitor.java
index 77de711..7cb6111 100644
--- a/harry-core/src/harry/visitors/PartitionVisitor.java
+++ b/harry-core/src/harry/visitors/Visitor.java
@@ -20,12 +20,14 @@ package harry.visitors;
 
 import harry.core.Run;
 
-public interface PartitionVisitor
+public interface Visitor
 {
-    void visitPartition(long lts);
-    public void shutdown() throws InterruptedException;
-    public interface PartitionVisitorFactory
+    void visit(long lts);
+
+    public default void shutdown() throws InterruptedException {}
+
+    public interface VisitorFactory
     {
-        public PartitionVisitor make(Run run);
+        public Visitor make(Run run);
     }
 }
\ No newline at end of file
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index 709ba81..0f474f0 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -44,9 +44,9 @@ import harry.model.clock.OffsetClock;
 import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.runner.DataTracker;
-import harry.visitors.MutatingPartitionVisitor;
-import harry.visitors.PartitionVisitor;
-import harry.visitors.Operation;
+import harry.visitors.MutatingVisitor;
+import harry.visitors.Visitor;
+import harry.visitors.OperationExecutor;
 import harry.util.BitSet;
 
 public class OpSelectorsTest
@@ -213,8 +213,8 @@ public class OpSelectorsTest
                 SystemUnderTest.NO_OP,
                 MetricReporter.NO_OP);
 
-        PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run,
-                                                                         (r) -> new Operation()
+        Visitor visitor = new MutatingVisitor(run,
+                                              (r) -> new OperationExecutor()
                                                                          {
                                                                              public CompiledStatement insert(long lts, long pd, long cd, long m)
                                                                              {
@@ -279,7 +279,7 @@ public class OpSelectorsTest
 
         for (int lts = 0; lts < 1000; lts++)
         {
-            partitionVisitor.visitPartition(lts);
+            visitor.visit(lts);
         }
 
         for (Collection<Long> value : partitionMap.values())
diff --git a/harry-core/test/harry/operations/RelationTest.java b/harry-core/test/harry/operations/RelationTest.java
index e0c6e73..eaa7ddb 100644
--- a/harry-core/test/harry/operations/RelationTest.java
+++ b/harry-core/test/harry/operations/RelationTest.java
@@ -173,7 +173,7 @@ public class RelationTest
                                                                           throw new RuntimeException("not implemented");
                                                                       }
 
-                                                                      public BitSet columnMask(long pd, long lts, long opId)
+                                                                      public BitSet columnMask(long pd, long lts, long opId, OpSelectors.OperationKind opType)
                                                                       {
                                                                           throw new RuntimeException("not implemented");
                                                                       }
diff --git a/harry-integration-backup/test/resources/single_partition_test.yml b/harry-integration-backup/test/resources/single_partition_test.yml
deleted file mode 100644
index 0ebe2aa..0000000
--- a/harry-integration-backup/test/resources/single_partition_test.yml
+++ /dev/null
@@ -1,55 +0,0 @@
-seed: 1
-
-# Default schema provider generates random schema
-schema_provider:
-  default: {}
-
-drop_schema: false
-create_schema: true
-truncate_table: false
-
-clock:
-  offset:
-    offset: 1000
-
-run_time: 10
-run_time_unit: "MINUTES"
-
-system_under_test:
-  println: {}
-
-partition_descriptor_selector:
-  always_same:
-    pd: 12345
-
-clustering_descriptor_selector:
-  default:
-    modifications_per_lts:
-      type: "constant"
-      constant: 2
-    rows_per_modification:
-      type: "constant"
-      constant: 2
-    operation_kind_weights:
-      DELETE_RANGE: 1
-      DELETE_SLICE: 1
-      DELETE_ROW: 1
-      DELETE_COLUMN: 1
-      DELETE_PARTITION: 1
-      DELETE_COLUMN_WITH_STATICS: 1
-      INSERT_WITH_STATICS: 24
-      INSERT: 24
-      UPDATE_WITH_STATICS: 23
-      UPDATE: 23
-    column_mask_bitsets: null
-    max_partition_size: 100
-
-data_tracker:
-  no_op: {}
-
-runner:
-  sequential:
-    partition_visitors: []
-
-metric_reporter:
-  no_op: {}
\ No newline at end of file
diff --git a/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
new file mode 100644
index 0000000..ff4ad86
--- /dev/null
+++ b/harry-integration/src/harry/model/sut/InJVMTokenAwareVisitExecutor.java
@@ -0,0 +1,114 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model.sut;
+
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.operations.CompiledStatement;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.LoggingVisitor;
+import harry.visitors.OperationExecutor;
+import harry.visitors.Visitor;
+
+public class InJVMTokenAwareVisitExecutor extends LoggingVisitor.LoggingVisitorExecutor
+{
+    public static void init()
+    {
+        Configuration.registerSubtypes(Configuation.class);
+    }
+
+    private final InJvmSut sut;
+    private final SystemUnderTest.ConsistencyLevel cl;
+    private final SchemaSpec schema;
+    private final int maxRetries = 10;
+
+    public InJVMTokenAwareVisitExecutor(Run run,
+                                        OperationExecutor.RowVisitorFactory rowVisitorFactory,
+                                        SystemUnderTest.ConsistencyLevel cl)
+    {
+        super(run, rowVisitorFactory.make(run));
+        this.sut = (InJvmSut) run.sut;
+        this.schema = run.schemaSpec;
+        this.cl = cl;
+    }
+
+    @Override
+    protected void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement)
+    {
+        executeAsyncWithRetries(lts, pd, future, statement, 0);
+    }
+
+    private void executeAsyncWithRetries(long lts, long pd, CompletableFuture<Object[][]> future, CompiledStatement statement, int retries)
+    {
+        if (sut.isShutdown())
+            throw new IllegalStateException("System under test is shut down");
+
+        if (retries > this.maxRetries)
+            throw new IllegalStateException(String.format("Can not execute statement %s after %d retries", statement, retries));
+
+        Object[] partitionKey =  schema.inflatePartitionKey(pd);
+        int[] replicas = sut.getReplicasFor(partitionKey, schema.keyspace, schema.table);
+        // TODO: find a better source of entropy
+        int replica = replicas[new Random(lts).nextInt(replicas.length)];
+        if (cl == SystemUnderTest.ConsistencyLevel.NODE_LOCAL)
+        {
+            future.complete(sut.cluster.get(replica).executeInternal(statement.cql(), statement.bindings()));
+        }
+        else
+        {
+            CompletableFuture.supplyAsync(() -> sut.cluster.coordinator(replica).execute(statement.cql(), InJvmSut.toApiCl(cl), statement.bindings()), executor)
+                             .whenComplete((res, t) ->
+                                           {
+                                               if (t != null)
+                                                   executor.schedule(() -> executeAsyncWithRetries(lts, pd, future, statement, retries + 1), 1, TimeUnit.SECONDS);
+                                               else
+                                                   future.complete(res);
+                                           });
+        }
+    }
+
+    @JsonTypeName("in_jvm_token_aware")
+    public static class Configuation implements Configuration.VisitorConfiguration
+    {
+        public final Configuration.RowVisitorConfiguration row_visitor;
+        public final SystemUnderTest.ConsistencyLevel consistency_level;
+
+        @JsonCreator
+        public Configuation(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration rowVisitor,
+                            @JsonProperty("consistency_level") SystemUnderTest.ConsistencyLevel consistencyLevel)
+        {
+            this.row_visitor = rowVisitor;
+            this.consistency_level = consistencyLevel;
+        }
+
+        @Override
+        public Visitor make(Run run)
+        {
+            return new GeneratingVisitor(run, new InJVMTokenAwareVisitExecutor(run, row_visitor, consistency_level));
+        }
+    }
+}
\ No newline at end of file
diff --git a/harry-integration/src/harry/model/sut/InJvmSut.java b/harry-integration/src/harry/model/sut/InJvmSut.java
index 930c4d4..f758ca5 100644
--- a/harry-integration/src/harry/model/sut/InJvmSut.java
+++ b/harry-integration/src/harry/model/sut/InJvmSut.java
@@ -20,7 +20,10 @@ package harry.model.sut;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,9 +32,15 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import harry.core.Configuration;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
 {
@@ -83,4 +92,17 @@ public class InJvmSut extends InJvmSutBase<IInvokableInstance, Cluster>
             return new InJvmSut(cluster);
         }
     }
+
+    public int[] getReplicasFor(Object[] partitionKey, String keyspace, String table)
+    {
+        return cluster.get(1).appliesOnInstance((Object[] pk, String ks) ->
+                                                {
+                                                    String pkString = Arrays.asList(pk).stream().map(Object::toString).collect(Collectors.joining(":"));
+                                                    EndpointsForToken endpoints = StorageService.instance.getNaturalReplicasForToken(ks, table, pkString);
+                                                    int[] nodes = new int[endpoints.size()];
+                                                    for (int i = 0; i < endpoints.size(); i++)
+                                                        nodes[i] = endpoints.get(i).endpoint().address.getAddress()[3];
+                                                    return nodes;
+                                                }).apply(partitionKey, keyspace);
+    }
 }
\ No newline at end of file
diff --git a/harry-core/src/harry/visitors/FaultInjectingPartitionVisitor.java b/harry-integration/src/harry/runner/FaultInjectingVisitor.java
similarity index 75%
rename from harry-core/src/harry/visitors/FaultInjectingPartitionVisitor.java
rename to harry-integration/src/harry/runner/FaultInjectingVisitor.java
index 6a03d35..4cca1b2 100644
--- a/harry-core/src/harry/visitors/FaultInjectingPartitionVisitor.java
+++ b/harry-integration/src/harry/runner/FaultInjectingVisitor.java
@@ -16,9 +16,11 @@
  *  limitations under the License.
  */
 
-package harry.visitors;
+package harry.runner;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -29,35 +31,39 @@ import harry.core.Configuration;
 import harry.core.Run;
 import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
+import harry.visitors.LoggingVisitor;
+import harry.visitors.OperationExecutor;
+import harry.visitors.Visitor;
 
-public class FaultInjectingPartitionVisitor extends LoggingPartitionVisitor
+public class FaultInjectingVisitor extends LoggingVisitor
 {
     public static void init()
     {
-        Configuration.registerSubtypes(FaultInjectingPartitionVisitorConfiguration.class);
+        Configuration.registerSubtypes(FaultInjectingVisitorConfiguration.class);
     }
 
     @JsonTypeName("fault_injecting")
-    public static class FaultInjectingPartitionVisitorConfiguration extends Configuration.MutatingPartitionVisitorConfiguation
+    public static class FaultInjectingVisitorConfiguration extends Configuration.MutatingVisitorConfiguation
     {
         @JsonCreator
-        public FaultInjectingPartitionVisitorConfiguration(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration row_visitor)
+        public FaultInjectingVisitorConfiguration(@JsonProperty("row_visitor") Configuration.RowVisitorConfiguration row_visitor)
         {
             super(row_visitor);
         }
 
         @Override
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
-            return new FaultInjectingPartitionVisitor(run, row_visitor);
+            return new FaultInjectingVisitor(run, row_visitor);
         }
     }
 
     private final AtomicInteger cnt = new AtomicInteger();
 
     private final SystemUnderTest.FaultInjectingSut sut;
+    protected final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
 
-    public FaultInjectingPartitionVisitor(Run run, Operation.RowVisitorFactory rowVisitorFactory)
+    public FaultInjectingVisitor(Run run, OperationExecutor.RowVisitorFactory rowVisitorFactory)
     {
         super(run, rowVisitorFactory);
         this.sut = (SystemUnderTest.FaultInjectingSut) run.sut;
diff --git a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
index 41926f0..fe811e5 100644
--- a/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
+++ b/harry-integration/src/harry/runner/RepairingLocalStateValidator.java
@@ -19,6 +19,7 @@
 package harry.runner;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,7 +35,7 @@ import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.operations.Query;
 import harry.visitors.AllPartitionsValidator;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.Visitor;
 
 import static harry.model.SelectHelper.resultSetToRow;
 
@@ -56,19 +57,22 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
     }
 
     @Override
-    public void visitPartition(long lts)
+    public void visit(long lts)
     {
         if (lts > 0 && lts % triggerAfter == 0)
         {
             System.out.println("Starting repair...");
-            inJvmSut.cluster().get(1).nodetool("repair", "--full");
+
+            inJvmSut.cluster().stream().forEach((instance) -> {
+                instance.nodetool("repair", "--full");
+            });
             System.out.println("Validating partitions...");
-            validateAllPartitions(executor, concurrency);
+            super.visit(lts);
         }
     }
 
     @JsonTypeName("repair_and_validate_local_states")
-    public static class RepairingLocalStateValidatorConfiguration implements Configuration.PartitionVisitorConfiguration
+    public static class RepairingLocalStateValidatorConfiguration implements Configuration.VisitorConfiguration
     {
         private final int concurrency;
         private final int trigger_after;
@@ -84,7 +88,7 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
             this.modelConfiguration = model;
         }
 
-        public PartitionVisitor make(Run run)
+        public Visitor make(Run run)
         {
             return new RepairingLocalStateValidator(concurrency, trigger_after, run, modelConfiguration);
         }
@@ -106,9 +110,9 @@ public class RepairingLocalStateValidator extends AllPartitionsValidator
         public void validate(Query query)
         {
             CompiledStatement compiled = query.toSelectStatement();
-            for (int i = 1; i <= inJvmSut.cluster.size(); i++)
+            int[] replicas = inJvmSut.getReplicasFor(schemaSpec.inflatePartitionKey(query.pd), schemaSpec.keyspace, schemaSpec.table);
+            for (int node : replicas)
             {
-                int node = i;
                 validate(() -> {
                     Object[][] objects = inJvmSut.execute(compiled.cql(),
                                                           SystemUnderTest.ConsistencyLevel.NODE_LOCAL,
diff --git a/harry-integration/src/harry/runner/TrivialShrinker.java b/harry-integration/src/harry/runner/TrivialShrinker.java
index 23bdbcc..2879e0e 100644
--- a/harry-integration/src/harry/runner/TrivialShrinker.java
+++ b/harry-integration/src/harry/runner/TrivialShrinker.java
@@ -28,9 +28,9 @@ import java.util.function.Predicate;
 
 import harry.core.Configuration;
 import harry.core.Run;
-import harry.visitors.AbstractPartitionVisitor;
-import harry.visitors.PartitionVisitor;
-import harry.visitors.SkippingPartitionVisitor;
+import harry.visitors.DelegatingVisitor;
+import harry.visitors.Visitor;
+import harry.visitors.SkippingVisitor;
 
 /**
  * A most trivial imaginable shrinker: attempts to skip partitions and/or logical timestamps to see if the
@@ -62,15 +62,16 @@ public class TrivialShrinker
 
             Run run = configuration.createRun();
             Configuration.SequentialRunnerConfig config = (Configuration.SequentialRunnerConfig) configuration.runner;
-            List<PartitionVisitor> visitors = new ArrayList<>();
-            for (Configuration.PartitionVisitorConfiguration factory : config.partition_visitor_factories)
+            List<Visitor> visitors = new ArrayList<>();
+            for (Configuration.VisitorConfiguration factory : config.visitor_factories)
             {
-                PartitionVisitor visitor = factory.make(run);
-                if (visitor instanceof AbstractPartitionVisitor)
+                Visitor visitor = factory.make(run);
+                if (visitor instanceof DelegatingVisitor)
                 {
-                    visitors.add(new SkippingPartitionVisitor((AbstractPartitionVisitor) visitor,
-                                                              ltsToSkip,
-                                                              pdsToSkip));
+                    visitors.add(new SkippingVisitor(visitor,
+                                                     (lts) -> run.pdSelector.pd(lts, run.schemaSpec),
+                                                     ltsToSkip,
+                                                     pdsToSkip));
                 }
                 else
                 {
@@ -159,13 +160,13 @@ public class TrivialShrinker
         }
     }
 
-    public static void runOnce(Run run, List<PartitionVisitor> visitors, long maxLts)
+    public static void runOnce(Run run, List<Visitor> visitors, long maxLts)
     {
         for (long lts = 0; lts <= maxLts; lts++)
         {
-            for (PartitionVisitor visitor : visitors)
+            for (Visitor visitor : visitors)
             {
-                visitor.visitPartition(lts);
+                visitor.visit(lts);
             }
         }
     }
@@ -182,4 +183,4 @@ public class TrivialShrinker
         }
         return s.substring(0, s.length() - 1);
     }
-}
+}
\ No newline at end of file
diff --git a/harry-integration/src/harry/visitors/SkippingPartitionVisitor.java b/harry-integration/src/harry/visitors/SkippingPartitionVisitor.java
deleted file mode 100644
index fa910cb..0000000
--- a/harry-integration/src/harry/visitors/SkippingPartitionVisitor.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package harry.visitors;
-
-import java.util.Set;
-
-public class SkippingPartitionVisitor extends AbstractPartitionVisitor
-{
-    private final AbstractPartitionVisitor delegate;
-    private final Set<Long> ltsToSkip;
-    private final Set<Long> pdsToSkip;
-
-    public SkippingPartitionVisitor(AbstractPartitionVisitor delegate,
-                                    Set<Long> ltsToSkip,
-                                    Set<Long> pdsToSkip)
-    {
-        super(delegate);
-        this.delegate = delegate;
-        this.ltsToSkip = ltsToSkip;
-        this.pdsToSkip = pdsToSkip;
-    }
-
-    protected void beforeLts(long lts, long pd)
-    {
-        delegate.beforeLts(lts, pd);
-    }
-
-    protected void afterLts(long lts, long pd)
-    {
-        delegate.afterLts(lts, pd);
-    }
-
-    protected void beforeBatch(long lts, long pd, long m)
-    {
-        delegate.beforeBatch(lts, pd, m);
-    }
-
-    protected void operation(long lts, long pd, long cd, long m, long opId)
-    {
-        if (pdsToSkip.contains(pd) || ltsToSkip.contains(lts))
-            return;
-
-        delegate.operation(lts, pd, cd, m, opId);
-    }
-
-    protected void afterBatch(long lts, long pd, long m)
-    {
-        delegate.afterBatch(lts, pd, m);
-    }
-
-    public void shutdown() throws InterruptedException
-    {
-        delegate.shutdown();
-    }
-}
diff --git a/harry-core/src/harry/visitors/PartitionVisitor.java b/harry-integration/src/harry/visitors/SkippingVisitor.java
similarity index 51%
rename from harry-core/src/harry/visitors/PartitionVisitor.java
rename to harry-integration/src/harry/visitors/SkippingVisitor.java
index 77de711..8da0a89 100644
--- a/harry-core/src/harry/visitors/PartitionVisitor.java
+++ b/harry-integration/src/harry/visitors/SkippingVisitor.java
@@ -18,14 +18,36 @@
 
 package harry.visitors;
 
-import harry.core.Run;
+import java.util.Set;
 
-public interface PartitionVisitor
+public class SkippingVisitor implements Visitor
 {
-    void visitPartition(long lts);
-    public void shutdown() throws InterruptedException;
-    public interface PartitionVisitorFactory
+    private final Set<Long> ltsToSkip;
+    private final Set<Long> pdsToSkip;
+    private final LtsToPd ltsToPd;
+    private final Visitor delegate;
+
+    public SkippingVisitor(Visitor delegate,
+                           LtsToPd ltsToPd,
+                           Set<Long> ltsToSkip,
+                           Set<Long> pdsToSkip)
+    {
+        this.delegate = delegate;
+        this.ltsToSkip = ltsToSkip;
+        this.pdsToSkip = pdsToSkip;
+        this.ltsToPd = ltsToPd;
+    }
+
+    public void visit(long lts)
+    {
+        if (ltsToSkip.contains(lts) || pdsToSkip.contains(ltsToPd.convert(lts)))
+            return;
+
+        delegate.visit(lts);
+    }
+
+    public static interface LtsToPd
     {
-        public PartitionVisitor make(Run run);
+        public long convert(long lts);
     }
-}
\ No newline at end of file
+}
diff --git a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
index b819eb5..33463b8 100644
--- a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
+++ b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java
@@ -1,21 +1,3 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
 package harry.generators;
 
 import java.util.Random;
@@ -32,10 +14,10 @@ import harry.generators.distribution.Distribution;
 import harry.model.NoOpChecker;
 import harry.model.OpSelectors;
 import harry.model.sut.SystemUnderTest;
-import harry.visitors.MutatingPartitionVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.PartitionVisitor;
-import harry.visitors.SinglePartitionValidator;
+import harry.visitors.Visitor;
+import harry.visitors.SingleValidator;
 import harry.util.TestRunner;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
@@ -110,16 +92,16 @@ public class DataGeneratorsIntegrationTest extends CQLTester
                                           .build()
                                           .createRun();
 
-                                PartitionVisitor visitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+                                Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
                                 for (int lts = 0; lts < 100; lts++)
-                                    visitor.visitPartition(lts);
+                                    visitor.visit(lts);
                             }
 
                             Run run = builder.build()
                                              .createRun();
-                            PartitionVisitor visitor = new SinglePartitionValidator(100, run, NoOpChecker::new);
+                            Visitor visitor = new SingleValidator(100, run, NoOpChecker::new);
                             for (int lts = 0; lts < 100; lts++)
-                                visitor.visitPartition(lts);
+                                visitor.visit(lts);
 
                         });
 
diff --git a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
new file mode 100644
index 0000000..6c0ee24
--- /dev/null
+++ b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java
@@ -0,0 +1,197 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaGenerators;
+import harry.ddl.SchemaSpec;
+import harry.dsl.HistoryBuilder;
+import harry.operations.CompiledStatement;
+import harry.operations.Query;
+import harry.operations.QueryGenerator;
+import harry.reconciler.Reconciler;
+import harry.util.TestRunner;
+import harry.visitors.MutatingVisitor;
+import harry.visitors.MutatingRowVisitor;
+import harry.visitors.ReplayingVisitor;
+
+public class HistoryBuilderIntegrationTest extends ModelTestBase
+{
+    public Configuration.ConfigurationBuilder configuration(long seed, SchemaSpec schema)
+    {
+        return super.configuration(seed, schema)
+                    .setPartitionDescriptorSelector((ignore) -> new HistoryBuilder.PdSelector())
+                    // TODO: ideally, we want a custom/tailored clustering descriptor selector
+                    .setClusteringDescriptorSelector((builder) -> builder.setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(100_000))
+                                                                         .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(100_000)));
+    }
+
+    @Test
+    public void simpleDSLTest()
+    {
+        Supplier<SchemaSpec> supplier = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
+        {
+            SchemaSpec schema = supplier.get();
+            Configuration config = configuration(i, schema).build();
+
+            Run run = config.createRun();
+            beforeEach();
+            run.sut.schemaChange(schema.compile().cql());
+            HistoryBuilder history = new HistoryBuilder(run);
+
+            Set<Long> pds = new HashSet<>();
+
+            for (int j = 0; j < 5; j++)
+            {
+                history.nextPartition()
+                       .simultaneously()
+                         .batch()
+                           .insert()
+                           .delete()
+                           .finish()
+                         .insert()
+                         .finish()
+                       .nextPartition()
+                       .sequentially()
+                       .randomOrder()
+                         .batch()
+                           .insert()
+                           .delete()
+                           .finish()
+                         .updates(5)
+                         .partitionDelete()
+                         .finish()
+                       .nextPartition()
+                       .sequentially()
+                       .randomOrder()
+                         .batch()
+                           .insert()
+                           .delete()
+                           .finish()
+                         .updates(10)
+                           .partitionDelete()
+                           .finish();
+
+                ReplayingVisitor visitor = history.visitor(new MutatingVisitor.MutatingVisitExecutor(run,
+                                                                                                     new MutatingRowVisitor(run)
+                                                                                                     {
+                                                                                                         public CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
+                                                                                                         {
+                                                                                                             pds.add(pd);
+                                                                                                             return super.perform(op, lts, pd, cd, opId);
+                                                                                                         }
+                                                                                                     }));
+
+                visitor.replayAll(run);
+
+                Model model = new QuiescentChecker(run, new Reconciler(run,
+                                                                       history::visitor));
+                QueryGenerator.TypedQueryGenerator queryGenerator = new QueryGenerator.TypedQueryGenerator(run);
+                for (Long pd : pds)
+                {
+                    model.validate(Query.selectPartition(run.schemaSpec,
+                                                         pd,
+                                                         false));
+
+                    int lts = new Random().nextInt((int) run.clock.maxLts());
+                    for (int k = 0; k < 3; k++)
+                        queryGenerator.inflate(lts, k);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testHistoryBuilder() throws Throwable
+    {
+        Supplier<SchemaSpec> supplier = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
+        for (int i = 0; i < SchemaGenerators.DEFAULT_RUNS; i++)
+        {
+            SchemaSpec schema = supplier.get();
+            Configuration config = configuration(1L, schema).build();
+            Run run = config.createRun();
+
+            beforeEach();
+            run.sut.schemaChange(schema.compile().cql());
+
+            TestRunner.test((rng) -> {
+                                HistoryBuilderTest.Counts counts = new HistoryBuilderTest.Counts();
+                                counts.randomOrder = rng.nextBoolean();
+                                counts.simultaneously = rng.nextBoolean();
+                                counts.partitionDeletion = rng.nextInt(1, 10);
+                                counts.update = rng.nextInt(1, 10);
+                                counts.insert = rng.nextInt(1, 10);
+                                counts.delete = rng.nextInt(1, 10);
+                                counts.rangeDelete = rng.nextInt(1, 10);
+                                counts.sliceDelete = rng.nextInt(1, 10);
+                                counts.columnDelete = rng.nextInt(1, 10);
+                                return counts;
+                            },
+                            () -> new HistoryBuilder(run),
+                            (sut, counts) -> {
+                                counts.apply(sut);
+                                return sut;
+                            },
+                            (sut) -> {
+                                Set<Long> pds = new HashSet<>();
+                                ReplayingVisitor visitor = sut.visitor(new MutatingVisitor.MutatingVisitExecutor(run,
+                                                                                                                 new MutatingRowVisitor(run)
+                                                                                                                {
+                                                                                                                    public CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
+                                                                                                                    {
+                                                                                                                        pds.add(pd);
+                                                                                                                        return super.perform(op, lts, pd, cd, opId);
+                                                                                                                    }
+                                                                                                                }));
+
+                                visitor.replayAll(run);
+
+                                Model model = new QuiescentChecker(run, new Reconciler(run,
+                                                                                       sut::visitor));
+                                QueryGenerator.TypedQueryGenerator queryGenerator = new QueryGenerator.TypedQueryGenerator(run);
+                                for (Long pd : pds)
+                                {
+                                    model.validate(Query.selectPartition(run.schemaSpec,
+                                                                         pd,
+                                                                         false));
+                                    model.validate(Query.selectPartition(run.schemaSpec,
+                                                                         pd,
+                                                                         true));
+                                    int lts = new Random().nextInt((int) run.clock.maxLts());
+                                    for (int k = 0; k < 3; k++)
+                                        queryGenerator.inflate(lts, k);
+                                }
+                            });
+        }
+    }
+
+    Configuration.ModelConfiguration modelConfiguration()
+    {
+        return new Configuration.QuiescentCheckerConfig();
+    }
+}
\ No newline at end of file
diff --git a/harry-integration/test/harry/model/HistoryBuilderTest.java b/harry-integration/test/harry/model/HistoryBuilderTest.java
new file mode 100644
index 0000000..5ccca60
--- /dev/null
+++ b/harry-integration/test/harry/model/HistoryBuilderTest.java
@@ -0,0 +1,186 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaSpec;
+import harry.dsl.HistoryBuilder;
+import harry.util.TestRunner;
+import harry.visitors.ReplayingVisitor;
+import harry.visitors.VisitExecutor;
+
+public class HistoryBuilderTest
+{
+    @Test
+    public void testHistoryBuilder() throws Throwable
+    {
+        SchemaSpec schema = MockSchema.tbl1;
+
+        Configuration config = IntegrationTestBase.sharedConfiguration(1, schema)
+                                                  .setPartitionDescriptorSelector((ignore) -> new HistoryBuilder.PdSelector())
+                                                  .setClusteringDescriptorSelector((builder) -> builder.setNumberOfModificationsDistribution(new Configuration.ConstantDistributionConfig(100_000))
+                                                                                                       .setRowsPerModificationDistribution(new Configuration.ConstantDistributionConfig(100_000)))
+                                                  .build();
+
+        Run run = config.createRun();
+        TestRunner.test((rng) -> {
+                            Counts counts = new Counts();
+                            counts.randomOrder = rng.nextBoolean();
+                            counts.simultaneously = rng.nextBoolean();
+                            counts.partitionDeletion = rng.nextInt(1, 10);
+                            counts.update = rng.nextInt(1, 10);
+                            counts.insert = rng.nextInt(1, 10);
+                            counts.delete = rng.nextInt(1, 10);
+                            counts.rangeDelete = rng.nextInt(1, 10);
+                            counts.sliceDelete = rng.nextInt(1, 10);
+                            counts.columnDelete = rng.nextInt(1, 10);
+                            return counts;
+                        },
+                        () -> new ArrayList<Counts>(),
+                        () -> new HistoryBuilder(run),
+                        (model, counts) -> {
+                            model.add(counts);
+                            return model;
+                        },
+                        (sut, counts) -> {
+                            counts.apply(sut);
+                            return sut;
+                        },
+                        (model, sut) -> {
+                            Iterator<Counts> iter = model.iterator();
+                            ReplayingVisitor visitor = sut.visitor(new VisitExecutor()
+                            {
+                                Counts current = iter.next();
+                                long lastPd = Long.MIN_VALUE;
+
+                                public void beforeLts(long lts, long pd)
+                                {
+                                    if (lastPd == Long.MIN_VALUE)
+                                    {
+                                        lastPd = pd;
+                                        return;
+                                    }
+
+                                    if (current.allDone())
+                                    {
+                                        Assert.assertNotEquals("Should have switched partition after finishing all operations",
+                                                               pd, lastPd);
+                                        Assert.assertTrue("System under test still has operations, while model expects none",
+                                                          iter.hasNext());
+                                        current = iter.next();
+                                        lastPd = pd;
+                                    }
+                                }
+
+                                public void operation(long lts, long pd, long cd, long m, long opId, OpSelectors.OperationKind kind)
+                                {
+                                    switch (kind)
+                                    {
+                                        case UPDATE:
+                                        case UPDATE_WITH_STATICS:
+                                            current.update--;
+                                            break;
+                                        case INSERT:
+                                        case INSERT_WITH_STATICS:
+                                            current.insert--;
+                                            break;
+                                        case DELETE_PARTITION:
+                                            current.partitionDeletion--;
+                                            break;
+                                        case DELETE_ROW:
+                                            current.delete--;
+                                            break;
+                                        case DELETE_RANGE:
+                                            current.rangeDelete--;
+                                            break;
+                                        case DELETE_SLICE:
+                                            current.sliceDelete--;
+                                            break;
+                                        case DELETE_COLUMN:
+                                        case DELETE_COLUMN_WITH_STATICS:
+                                            current.columnDelete--;
+                                            break;
+                                    }
+                                }
+
+                                public void afterLts(long lts, long pd){}
+                                public void beforeBatch(long lts, long pd, long m){}
+                                public void afterBatch(long lts, long pd, long m){}
+                            });
+                            visitor.replayAll(run);
+                            for (Counts counts : model)
+                                Assert.assertTrue(counts.toString(), counts.allDone());
+                        });
+    }
+
+    public static class Counts
+    {
+        boolean randomOrder;
+        boolean simultaneously;
+        int partitionDeletion;
+        int update;
+        int insert;
+        int delete;
+        int rangeDelete;
+        int sliceDelete;
+        int columnDelete;
+
+        public void apply(HistoryBuilder sut)
+        {
+            HistoryBuilder.PartitionBuilder builder = sut.nextPartition();
+            if (randomOrder)
+                builder.randomOrder();
+            else
+                builder.strictOrder();
+
+            if (simultaneously)
+                builder.simultaneously();
+            else
+                builder.sequentially();
+
+            builder.deletes(delete);
+            builder.inserts(insert);
+            builder.updates(update);
+            builder.partitionDeletions(partitionDeletion);
+            builder.rangeDeletes(rangeDelete);
+            builder.sliceDeletes(sliceDelete);
+            builder.columnDeletes(columnDelete);
+
+            builder.finish();
+        }
+        boolean allDone()
+        {
+            return partitionDeletion == 0 &&
+                   update == 0 &&
+                   insert == 0 &&
+                   delete == 0 &&
+                   rangeDelete == 0 &&
+                   sliceDelete == 0 &&
+                   columnDelete == 0;
+        }
+    }
+
+}
diff --git a/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
new file mode 100644
index 0000000..0f28005
--- /dev/null
+++ b/harry-integration/test/harry/model/InJVMTokenAwareExecutorTest.java
@@ -0,0 +1,91 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package harry.model;
+
+import java.util.function.Supplier;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import harry.core.Configuration;
+import harry.core.Run;
+import harry.ddl.SchemaGenerators;
+import harry.ddl.SchemaSpec;
+import harry.model.sut.InJVMTokenAwareVisitExecutor;
+import harry.model.sut.InJvmSut;
+import harry.model.sut.SystemUnderTest;
+import harry.runner.RepairingLocalStateValidator;
+import harry.visitors.GeneratingVisitor;
+import harry.visitors.Visitor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+
+public class InJVMTokenAwareExecutorTest extends IntegrationTestBase
+{
+    @BeforeClass
+    public static void before() throws Throwable
+    {
+        cluster = init(Cluster.build()
+                              .withNodes(5)
+                              .withConfig((cfg) -> cfg.with(Feature.GOSSIP, Feature.NETWORK))
+                              .start());
+        sut = new InJvmSut(cluster, 1);
+    }
+
+    @Override
+    @Before
+    public void beforeEach()
+    {
+        cluster.schemaChange("DROP KEYSPACE IF EXISTS harry");
+        cluster.schemaChange("CREATE KEYSPACE harry WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};");
+    }
+
+    @Test
+    public void testRepair()
+    {
+        Supplier<SchemaSpec> schemaGen = SchemaGenerators.progression(1);
+        for (int cnt = 0; cnt < SchemaGenerators.DEFAULT_RUNS; cnt++)
+        {
+            SchemaSpec schema = schemaGen.get();
+            Configuration.ConfigurationBuilder builder = sharedConfiguration(cnt, schema);
+
+            Configuration configuration = builder.build();
+            Run run = configuration.createRun();
+            run.sut.schemaChange(run.schemaSpec.compile().cql());
+
+            Visitor visitor = new GeneratingVisitor(run, new InJVMTokenAwareVisitExecutor(run,
+                                                                                          new Configuration.MutatingRowVisitorConfiguration(),
+                                                                                          SystemUnderTest.ConsistencyLevel.NODE_LOCAL));
+
+            OpSelectors.MonotonicClock clock = run.clock;
+            long maxPd = 0;
+            for (int i = 0; i < 10000; i++)
+            {
+                long lts = clock.nextLts();
+                visitor.visit(lts);
+                maxPd = Math.max(maxPd, run.pdSelector.positionFor(lts));
+            }
+
+            RepairingLocalStateValidator validator = new RepairingLocalStateValidator(5, 1, run, new Configuration.QuiescentCheckerConfig());
+            validator.visit(clock.maxLts());
+        }
+
+    }
+}
diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java
index 70b9fb3..576a291 100644
--- a/harry-integration/test/harry/model/ModelTestBase.java
+++ b/harry-integration/test/harry/model/ModelTestBase.java
@@ -28,11 +28,11 @@ import harry.core.Configuration;
 import harry.core.Run;
 import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
-import harry.visitors.LoggingPartitionVisitor;
+import harry.visitors.LoggingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.Visitor;
 import harry.runner.Runner;
-import harry.visitors.SinglePartitionValidator;
+import harry.visitors.SingleValidator;
 
 public abstract class ModelTestBase extends IntegrationTestBase
 {
@@ -58,7 +58,7 @@ public abstract class ModelTestBase extends IntegrationTestBase
                    .setRunTime(1, TimeUnit.MINUTES)
                    .setCreateSchema(false)
                    .setDropSchema(false)
-                   .setRunner(new Configuration.SequentialRunnerConfig(Arrays.asList(new Configuration.LoggingPartitionVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
+                   .setRunner(new Configuration.SequentialRunnerConfig(Arrays.asList(new Configuration.LoggingVisitorConfiguration(new Configuration.MutatingRowVisitorConfiguration()),
                                                                                      new Configuration.RecentPartitionsValidatorConfiguration(10, 10, 1, factory::make),
                                                                                      new Configuration.AllPartitionsValidatorConfiguration(10, 10, factory::make))));
             Runner runner = builder.build().createRunner();
@@ -83,9 +83,9 @@ public abstract class ModelTestBase extends IntegrationTestBase
 
     abstract Configuration.ModelConfiguration modelConfiguration();
 
-    protected PartitionVisitor validator(Run run)
+    protected Visitor validator(Run run)
     {
-        return new SinglePartitionValidator(100, run , modelConfiguration());
+        return new SingleValidator(100, run , modelConfiguration());
     }
 
     public Configuration.ConfigurationBuilder configuration(long seed, SchemaSpec schema)
@@ -103,16 +103,16 @@ public abstract class ModelTestBase extends IntegrationTestBase
         System.out.println(run.schemaSpec.compile().cql());
         OpSelectors.MonotonicClock clock = run.clock;
 
-        PartitionVisitor validator = validator(run);
-        PartitionVisitor partitionVisitor = new LoggingPartitionVisitor(run, MutatingRowVisitor::new);
+        Visitor validator = validator(run);
+        Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
 
         for (int i = 0; i < 20000; i++)
         {
             long lts = clock.nextLts();
-            partitionVisitor.visitPartition(lts);
+            visitor.visit(lts);
         }
 
-        validator.visitPartition(0);
+        validator.visit(0);
 
         if (!corrupt.apply(run))
         {
@@ -122,7 +122,7 @@ public abstract class ModelTestBase extends IntegrationTestBase
 
         try
         {
-            validator.visitPartition(0);
+            validator.visit(0);
         }
         catch (Throwable t)
         {
diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
index 928e4e9..92498dd 100644
--- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.function.Supplier;
 
-import harry.operations.Query;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -40,9 +39,10 @@ import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.ShowValueCorruptor;
 import harry.ddl.SchemaGenerators;
-import harry.visitors.MutatingPartitionVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.Visitor;
+import harry.operations.Query;
 import harry.operations.QueryGenerator;
 
 import static harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
@@ -111,7 +111,7 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
             System.out.println(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
 
-            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+            Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
             Model model = new QuiescentChecker(run);
 
             QueryResponseCorruptor corruptor = this.corruptorFactory.create(run);
@@ -119,7 +119,7 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase
             for (int i = 0; i < CYCLES; i++)
             {
                 long lts = clock.nextLts();
-                partitionVisitor.visitPartition(lts);
+                visitor.visit(lts);
             }
 
             while (true)
diff --git a/harry-integration/test/harry/model/QuerySelectorTest.java b/harry-integration/test/harry/model/QuerySelectorTest.java
index 3436809..6d8dc5c 100644
--- a/harry-integration/test/harry/model/QuerySelectorTest.java
+++ b/harry-integration/test/harry/model/QuerySelectorTest.java
@@ -31,9 +31,9 @@ import harry.ddl.SchemaGenerators;
 import harry.ddl.SchemaSpec;
 import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
-import harry.visitors.MutatingPartitionVisitor;
+import harry.visitors.MutatingVisitor;
 import harry.visitors.MutatingRowVisitor;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.Visitor;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
 
@@ -72,12 +72,12 @@ public class QuerySelectorTest extends IntegrationTestBase
             run.sut.schemaChange(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
 
-            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+            Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
             {
                 long lts = clock.nextLts();
-                partitionVisitor.visitPartition(lts);
+                visitor.visit(lts);
             }
 
             QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
@@ -146,12 +146,12 @@ public class QuerySelectorTest extends IntegrationTestBase
             Run run = config.createRun();
             run.sut.schemaChange(run.schemaSpec.compile().cql());
             OpSelectors.MonotonicClock clock = run.clock;
-            PartitionVisitor partitionVisitor = new MutatingPartitionVisitor(run, MutatingRowVisitor::new);
+            Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new);
 
             for (int i = 0; i < CYCLES; i++)
             {
                 long lts = clock.nextLts();
-                partitionVisitor.visitPartition(lts);
+                visitor.visit(lts);
             }
 
             QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run);
diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
index 55fb00b..945dce2 100644
--- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
+++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java
@@ -29,16 +29,16 @@ import harry.corruptor.HideValueCorruptor;
 import harry.corruptor.QueryResponseCorruptor;
 import harry.corruptor.QueryResponseCorruptor.SimpleQueryResponseCorruptor;
 import harry.ddl.SchemaSpec;
-import harry.visitors.PartitionVisitor;
+import harry.visitors.Visitor;
 import harry.operations.Query;
-import harry.visitors.SinglePartitionValidator;
+import harry.visitors.SingleValidator;
 
 public class QuiescentCheckerIntegrationTest extends ModelTestBase
 {
     @Override
-    protected PartitionVisitor validator(Run run)
+    protected Visitor validator(Run run)
     {
-        return new SinglePartitionValidator(100, run, modelConfiguration());
+        return new SingleValidator(100, run, modelConfiguration());
     }
 
     @Test
diff --git a/harry-integration/test/harry/model/TestEveryClustering.java b/harry-integration/test/harry/model/TestEveryClustering.java
deleted file mode 100644
index f3a2ba8..0000000
--- a/harry-integration/test/harry/model/TestEveryClustering.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package harry.model;
-
-import harry.core.Configuration;
-import harry.core.Run;
-import harry.ddl.SchemaGenerators;
-import harry.ddl.SchemaSpec;
-import harry.generators.distribution.Distribution;
-import harry.operations.CompiledStatement;
-import harry.operations.Query;
-import harry.operations.Relation;
-import harry.visitors.LoggingPartitionVisitor;
-import harry.visitors.MutatingRowVisitor;
-import harry.visitors.PartitionVisitor;
-import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.Supplier;
-
-public class TestEveryClustering extends IntegrationTestBase
-{
-    int CYCLES = 1000;
-
-    @Test
-    public void basicQuerySelectorTest()
-    {
-        Supplier<SchemaSpec> schemaGen = SchemaGenerators.progression(SchemaGenerators.DEFAULT_SWITCH_AFTER);
-        for (int cnt = 0; cnt < Integer.MAX_VALUE; cnt++)
-        {
-            beforeEach();
-            SchemaSpec schemaSpec = schemaGen.get();
-
-            System.out.println(schemaSpec.compile().cql());
-            int partitionSize = 1000;
-
-            Configuration config = sharedConfiguration(cnt, schemaSpec)
-                                   .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(1, partitionSize))
-                                   .setClusteringDescriptorSelector(sharedCDSelectorConfiguration()
-                                                                    .setNumberOfModificationsDistribution(() -> new Distribution.ConstantDistribution(1L))
-                                                                    .setRowsPerModificationDistribution(() -> new Distribution.ConstantDistribution(1L))
-                                                                    .setMaxPartitionSize(250)
-                                                                    .build())
-                                   .build();
-
-            Run run = config.createRun();
-            run.sut.schemaChange(run.schemaSpec.compile().cql());
-            OpSelectors.MonotonicClock clock = run.clock;
-
-            Set<Long> visitedCds = new HashSet<>();
-            PartitionVisitor partitionVisitor = new LoggingPartitionVisitor(run, (r) -> {
-                return new MutatingRowVisitor(r) {
-                    public CompiledStatement perform(OpSelectors.OperationKind op, long lts, long pd, long cd, long opId)
-                    {
-                        visitedCds.add(cd);
-                        return super.perform(op, lts, pd, cd, opId);
-                    }
-                };
-            });
-            sut.cluster().stream().forEach((IInvokableInstance node) -> node.nodetool("disableautocompaction"));
-            for (int i = 0; i < CYCLES; i++)
-            {
-                long lts = clock.nextLts();
-                partitionVisitor.visitPartition(lts);
-
-                if (i > 0 && i % 250 == 0)
-                    sut.cluster().stream().forEach((IInvokableInstance node) -> node.nodetool("flush", schemaSpec.keyspace, schemaSpec.table));
-            }
-
-            for (Long cd : visitedCds)
-            {
-                Query query = new Query.SingleClusteringQuery(Query.QueryKind.SINGLE_CLUSTERING,
-                                                              run.pdSelector.pd(0),
-                                                              cd,
-                                                              false,
-                                                              Relation.eqRelations(run.schemaSpec.ckGenerator.slice(cd), run.schemaSpec.clusteringKeys),
-                                                              run.schemaSpec);
-                Model model = new QuiescentChecker(run);
-                model.validate(query);
-            }
-        }
-    }
-}
diff --git a/harry-integration/test/resources/single_partition_test.yml b/harry-integration/test/resources/single_partition_test.yml
index 0ebe2aa..8ec4c4e 100644
--- a/harry-integration/test/resources/single_partition_test.yml
+++ b/harry-integration/test/resources/single_partition_test.yml
@@ -49,7 +49,7 @@ data_tracker:
 
 runner:
   sequential:
-    partition_visitors: []
+    visitors: []
 
 metric_reporter:
   no_op: {}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org