You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2019/08/27 08:35:24 UTC

[cassandra-diff] branch master created (now c95404e)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git.


      at c95404e  initial commit

This branch includes the following new commits:

     new c95404e  initial commit

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra-diff] 01/01: initial commit

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git

commit c95404e6de080d33af0c7ea3a7d9a1aed13e0383
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Aug 21 08:58:27 2019 +0200

    initial commit
    
    Co-authored-by: Bhaskar Muppana <mb...@apple.com>
    Co-authored-by: Jeff Jirsa <jj...@apple.com>
    Co-authored-by: Marcus Eriksson <ma...@apache.org>
    Co-authored-by: Michael Kjellman <kj...@apple.com>
    Co-authored-by: Nate McCall <zz...@gmail.com>
    Co-authored-by: Sam Tunnicliffe <sa...@beobal.com>
---
 README.md                                          |  86 +++
 api-server/README.md                               | 117 ++++
 api-server/pom.xml                                 | 119 ++++
 .../apache/cassandra/diff/api/DiffAPIServer.java   |  57 ++
 .../diff/api/resources/DiffJobsResource.java       | 228 ++++++++
 .../diff/api/resources/HealthResource.java         |  75 +++
 .../cassandra/diff/api/services/DBService.java     | 546 ++++++++++++++++++
 api-server/src/main/resources/log4j2.xml           |  32 +
 common/pom.xml                                     |  54 ++
 .../org/apache/cassandra/diff/ClusterProvider.java |  46 ++
 .../diff/ContactPointsClusterProvider.java         |  80 +++
 .../apache/cassandra/diff/JobConfiguration.java    |  60 ++
 .../cassandra/diff/MetadataKeyspaceOptions.java    |  29 +
 .../org/apache/cassandra/diff/SpecificTokens.java  |  61 ++
 .../cassandra/diff/YamlJobConfiguration.java       | 154 +++++
 pom.xml                                            | 108 ++++
 spark-job/localconfig.yaml                         |  51 ++
 spark-job/pom.xml                                  | 101 ++++
 .../apache/cassandra/diff/ComparisonExecutor.java  | 116 ++++
 .../org/apache/cassandra/diff/DiffCluster.java     | 245 ++++++++
 .../org/apache/cassandra/diff/DiffContext.java     |  75 +++
 .../java/org/apache/cassandra/diff/DiffJob.java    | 296 ++++++++++
 .../java/org/apache/cassandra/diff/Differ.java     | 326 +++++++++++
 .../org/apache/cassandra/diff/JobMetadataDb.java   | 567 ++++++++++++++++++
 .../org/apache/cassandra/diff/MismatchType.java    |  28 +
 .../apache/cassandra/diff/PartitionComparator.java | 114 ++++
 .../org/apache/cassandra/diff/PartitionKey.java    |  85 +++
 .../org/apache/cassandra/diff/PartitionStats.java  |  28 +
 .../org/apache/cassandra/diff/RangeComparator.java | 232 ++++++++
 .../java/org/apache/cassandra/diff/RangeStats.java | 286 +++++++++
 .../java/org/apache/cassandra/diff/TableSpec.java  | 121 ++++
 .../org/apache/cassandra/diff/TokenHelper.java     |  69 +++
 spark-job/src/main/resources/log4j2.xml            |  38 ++
 .../datastax/driver/core/ColumnMetadataHelper.java |  27 +
 .../cassandra/diff/ComparisonExecutorTest.java     | 277 +++++++++
 .../org/apache/cassandra/diff/DiffJobTest.java     |  60 ++
 .../java/org/apache/cassandra/diff/DifferTest.java | 111 ++++
 .../cassandra/diff/PartitionComparatorTest.java    | 524 +++++++++++++++++
 .../apache/cassandra/diff/RangeComparatorTest.java | 642 +++++++++++++++++++++
 .../java/org/apache/cassandra/diff/TestUtils.java  |  40 ++
 .../src/test/resources/cql-stress-narrow1.yaml     |  62 ++
 spark-job/src/test/resources/cql-stress-wide1.yaml |  69 +++
 42 files changed, 6442 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7b48652
--- /dev/null
+++ b/README.md
@@ -0,0 +1,86 @@
+# Cassandra diff
+
+## Configuration
+See `spark-job/localconfig.yaml` for an example config.
+
+## Custom cluster providers
+To make it easy to run in any environment the cluster providers are pluggable - there are two interfaces to implement.
+First, the `ClusterProvider` interface is used to create a connection to the clusters, and it is configured using
+`JobConfiguration#clusterConfig` (see below).
+### cluster_config
+This section has 3 parts - `source`, `target` and `metadata` where source and target describes the clusters that should
+be compared and metadata describes where we store information about any mismatches and the progress the job has done. 
+Metadata can be stored in one of the source/target clusters or in a separate cluster.
+
+The fields under source/target/metadata are passed in to the `ClusterProvider` (described by `impl`) as a map, so any
+custom cluster providers can be configured here.
+
+## Setting up clusters for diff
+One way of setting up clusters for diff is to restore a snapshot to two different clusters and then modifying one 
+of the clusters to be able to make sure that the queries still return the same results. This could include 
+upgrades/replacements/bounces/decommission/expansion. 
+
+## Environment variables
+Currently usernames and passwords are set as environment variables when running the diff tool and the api server:
+
+* `diff.cluster.<identifier>.cql_user` - the user name to use
+* `diff.cluster.<identifier>.cql_password` - password
+
+where `<identifier>` should be `source`, `target` and `metadata` for the username/password combinations for the
+matching clusters in the configuration.
+
+## Example
+This example starts two cassandra single-node clusters in docker, runs stress to populate them and then runs diff 
+to make sure the data matches;
+
+You need to have docker and spark setup.
+
+```shell script
+$ git clone <wherever>/cassandra-diff.git
+$ cd cassandra-diff
+$ mvn package
+$ docker run --name cas-src -d  -p 9042:9042 cassandra:3.0.18
+$ docker run --name cas-tgt -d  -p 9043:9042 cassandra:latest
+$ docker exec cas-src cassandra-stress write n=1k
+$ docker exec cas-tgt cassandra-stress write n=1k
+$ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-job/target/spark-job-0.1-SNAPSHOT.jar localconfig.yaml
+# ... logs
+INFO  DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 }
+## start api-server:
+$ mvn install
+$ cd api-server
+$ mvn exec:java
+$ curl -s localhost:8089/jobs/recent | python -mjson.tool
+  [
+      {
+          "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+          "buckets": 100,
+          "keyspace": "keyspace1",
+          "tables": [
+              "standard1"
+          ],
+          "sourceClusterName": "local_test_1",
+          "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "targetClusterName": "local_test_2",
+          "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "tasks": 10000,
+          "start": "2019-08-16T11:47:36.123Z"
+      }
+  ]
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | python -mjson.tool
+  [
+      {
+          "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+          "table": "standard1",
+          "matchedPartitions": 1000,
+          "mismatchedPartitions": 0,
+          "matchedRows": 1000,
+          "matchedValues": 6000,
+          "mismatchedValues": 0,
+          "onlyInSource": 0,
+          "onlyInTarget": 0,
+          "skippedPartitions": 0
+      }
+  ]
+
+```
diff --git a/api-server/README.md b/api-server/README.md
new file mode 100644
index 0000000..254601c
--- /dev/null
+++ b/api-server/README.md
@@ -0,0 +1,117 @@
+# Diff API Server
+## Configuration
+See main project README - the api server reads the `metadata_options` and `cluster_config.metadata` to connect
+
+## Running locally
+`mvn exec:java`
+
+## Endpoints
+### `/jobs/running/id`
+Returns the ids of currently running jobs
+
+### `/jobs/running`
+Summaries of all currently running jobs
+
+### `/jobs/recent`
+Summaries of recently run jobs
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/recent | python -mjson.tool
+  [
+      {
+          "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+          "buckets": 100,
+          "keyspace": "keyspace1",
+          "tables": [
+              "standard1"
+          ],
+          "sourceClusterName": "local_test_1",
+          "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "targetClusterName": "local_test_2",
+          "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "tasks": 10000,
+          "start": "2019-08-16T11:47:36.123Z"
+      }
+  ]
+
+```
+
+### `/jobs/{jobid}`
+Summary about a single job
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a | python -mjson.tool
+  {
+      "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+      "buckets": 100,
+      "keyspace": "keyspace1",
+      "tables": [
+          "standard1"
+      ],
+      "sourceClusterName": "local_test_1",
+      "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+      "targetClusterName": "local_test_2",
+      "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+      "tasks": 10000,
+      "start": "2019-08-16T11:47:36.123Z"
+  }
+```
+
+### `/jobs/{jobid}/results`
+The results for the given job.
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | python -mjson.tool
+[
+    {
+        "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+        "table": "standard1",
+        "matchedPartitions": 1000,
+        "mismatchedPartitions": 0,
+        "matchedRows": 1000,
+        "matchedValues": 6000,
+        "mismatchedValues": 0,
+        "onlyInSource": 0,
+        "onlyInTarget": 0,
+        "skippedPartitions": 0
+    }
+]
+```
+
+### `/jobs/{jobid}/status`
+Current status for a job, shows how many splits have been finished.
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/status | python -mjson.tool
+{
+    "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+    "completedByTable": {
+        "standard1": 10000
+    }
+}
+```
+
+### `/jobs/{jobid}/mismatches`
+Number of mismatches for the job.
+```shell script
+$  curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/mismatches | python -mjson.tool
+  {
+      "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+      "mismatchesByTable": {}
+  }
+```
+
+### `/jobs/{jobid}/errors/summary`
+Summary of the number errors for the job. 
+
+### `/jobs/{jobid}/errors/ranges`
+Lists failed ranges for the job.
+
+### `/jobs/{jobid}/errors`
+Details about the job errors.
+
+### `/jobs/by-start-date/{started-after}`
+### `/jobs/by-start-date/{started-after}/{started-before}`
+### `/jobs/by-source-cluster/{source}`
+### `/jobs/by-target-cluster/{target}`
+### `/jobs/by-keyspace/{keyspace}`
diff --git a/api-server/pom.xml b/api-server/pom.xml
new file mode 100644
index 0000000..ab431da
--- /dev/null
+++ b/api-server/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+		 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+      <groupId>org.apache.cassandra.diff</groupId>
+      <artifactId>diff</artifactId>
+      <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>api-server</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+      <!-- compile dependencies -->
+      <dependency>
+        <groupId>org.apache.cassandra.diff</groupId>
+        <artifactId>common</artifactId>
+        <version>${project.parent.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+        <version>3.1.7</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-rt-transports-http-jetty</artifactId>
+        <version>3.1.7</version>
+      </dependency>
+
+      <dependency>
+        <groupId>joda-time</groupId>
+        <artifactId>joda-time</artifactId>
+        <version>2.9.9</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>2.9.9.2</version>
+      </dependency>
+
+      <!-- provided dependencies -->
+      <dependency>
+        <groupId>org.jetbrains</groupId>
+        <artifactId>annotations</artifactId>
+      </dependency>
+
+      <!-- runtime dependencies -->
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-slf4j-impl</artifactId>
+      </dependency>
+
+      <!-- test dependencies -->
+      <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+      </dependency>
+
+      <dependency>
+        <groupId>org.assertj</groupId>
+        <artifactId>assertj-core</artifactId>
+        <version>3.4.1</version>
+        <scope>test</scope>
+      </dependency>
+
+    </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.6.0</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>java</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <mainClass>org.apache.cassandra.diff.api.DiffAPIServer</mainClass>
+          <arguments>
+            <argument>
+              ../spark-job/localconfig.yaml
+            </argument>
+          </arguments>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
new file mode 100644
index 0000000..6f14128
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api;
+
+import java.io.IOException;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.diff.YamlJobConfiguration;
+import org.apache.cassandra.diff.api.resources.DiffJobsResource;
+import org.apache.cassandra.diff.api.resources.HealthResource;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+
+public class DiffAPIServer {
+    public static void main(String[] args) throws IOException {
+        String filename = args[0];
+        JAXRSServerFactoryBean factoryBean = new JAXRSServerFactoryBean();
+
+        DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(filename));
+        factoryBean.setResourceProviders(Lists.newArrayList(new SingletonResourceProvider(diffResource),
+                                                            new SingletonResourceProvider(new HealthResource())));
+        factoryBean.setAddress("http://localhost:8089/");
+        Server server = factoryBean.create();
+
+        try {
+            server.start();
+            System.in.read();
+        } catch (Throwable t) {
+            t.printStackTrace(System.out);
+            throw t;
+        } finally {
+            diffResource.close();
+            if (server.isStarted())
+                server.stop();
+            System.exit(0);
+        }
+    }
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java
new file mode 100644
index 0000000..564f960
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.resources;
+
+import java.util.SortedSet;
+import java.util.UUID;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.diff.JobConfiguration;
+import org.apache.cassandra.diff.api.services.DBService;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+@Path("/jobs")
+public class DiffJobsResource {
+
+    private static final Logger logger = LoggerFactory.getLogger(DiffJobsResource.class);
+    private static final String DATE_FORMAT = "yyyy-MM-dd";
+    private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+    private static final ObjectMapper MAPPER = new ObjectMapper().setVisibility(PropertyAccessor.FIELD,
+                                                                                JsonAutoDetect.Visibility.NON_PRIVATE);
+    private final DBService dbService;
+
+    public DiffJobsResource(JobConfiguration conf) {
+        dbService = new DBService(conf);
+    }
+
+    @GET
+    @Path("/running/id")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRunningJobIds() {
+        return response(dbService.fetchRunningJobs());
+    }
+
+    @GET
+    @Path("/running")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRunningJobs() {
+        return response(dbService.fetchJobSummaries(dbService.fetchRunningJobs()));
+    }
+
+    @GET
+    @Path("/recent")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRecentJobs() {
+
+        SortedSet<DBService.JobSummary> recentJobs = Sets.newTreeSet(DBService.JobSummary.COMPARATOR.reversed());
+        DateTime now = DateTime.now(DateTimeZone.UTC);
+        DateTime maxStartDate = now;
+        DateTime minStartDate = maxStartDate.minusDays(30);
+        recentJobs.addAll(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+
+        while (recentJobs.size() < 10 && (maxStartDate.compareTo(now.minusDays(90)) >= 0)) {
+            maxStartDate = minStartDate;
+            minStartDate = minStartDate.minusDays(30);
+            recentJobs.addAll(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+        }
+
+        return response(recentJobs);
+    }
+
+    @GET
+    @Path("/{jobid}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJob(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchJobSummary(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/results")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobResults(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchJobResults(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/status")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobStatus(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchJobStatus(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/mismatches")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobMismatches(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchMismatches(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/errors/summary")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobErrorSummary(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchErrorSummary(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/errors/ranges")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobErrorRanges(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchErrorRanges(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/errors")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobErrorDetail(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchErrorDetail(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/by-start-date/{started-after}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsStartedSince(@PathParam("started-after") final String minStart) {
+
+        DateTime minStartDate = parseDate(minStart);
+        if (minStartDate == null)
+            return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+
+        DateTime maxStartDate = DateTime.now(DateTimeZone.UTC);
+
+        return getJobsStartedBetween(minStartDate, maxStartDate);
+    }
+
+    @GET
+    @Path("/by-start-date/{started-after}/{started-before}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsStartedSince(@PathParam("started-after") final String minStart,
+                                        @PathParam("started-before") final String maxStart) {
+
+        DateTime minStartDate = parseDate(minStart);
+        if (minStartDate == null)
+            return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+
+        DateTime maxStartDate = null;
+        if (isNullOrEmpty(maxStart)) {
+            DateTime.now(DateTimeZone.UTC);
+        }
+        else {
+            maxStartDate = parseDate(maxStart);
+            if (maxStartDate == null)
+                return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+            if (maxStartDate.compareTo(minStartDate) < 0)
+                return Response.status(400).entity("Invalid date range, started-before cannot be earlier than started-after").build();
+        }
+
+        return getJobsStartedBetween(minStartDate, maxStartDate);
+    }
+
+    @GET
+    @Path("/by-source-cluster/{source}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsBySourceCluster(@PathParam("source") final String sourceClusterName) {
+        return response(dbService.fetchJobsForSourceCluster(sourceClusterName));
+    }
+
+    @GET
+    @Path("/by-target-cluster/{target}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsByTargetCluster(@PathParam("target") final String targetClusterName) {
+        return response(dbService.fetchJobsForTargetCluster(targetClusterName));
+    }
+
+    @GET
+    @Path("/by-keyspace/{keyspace}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsByKeyspace(@PathParam("keyspace") final String keyspace) {
+        return response(dbService.fetchJobsForKeyspace(keyspace));
+    }
+
+    private Response response(Object data) {
+        try {
+            return Response.ok().entity(MAPPER.writer().writeValueAsString(data)).build();
+        }
+        catch (JsonProcessingException e) {
+            logger.error("JSON Processing error", e);
+            return Response.serverError().entity("Error constructing JSON response").build();
+        }
+    }
+
+    private Response getJobsStartedBetween(DateTime minStartDate, DateTime maxStartDate) {
+        return response(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+    }
+
+    private DateTime parseDate(String s) {
+        try {
+            return DATE_FORMATTER.parseDateTime(s);
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
+
+    public void close() {
+        dbService.close();
+    }
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java
new file mode 100644
index 0000000..d101615
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.resources;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Health Check: not terribly useful yet.
+ * TODO: add DB connection check
+ */
+@Path("/__health")
+public class HealthResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HealthResource.class);
+
+  private static Boolean overrideHealthStatus = null;
+
+  public HealthResource() {
+    LOGGER.debug("New HealthResource created.");
+  }
+
+  public static Boolean overrideHealthStatusTo(Boolean newStatus) {
+    Boolean oldStatus = overrideHealthStatus;
+    overrideHealthStatus = newStatus;
+    LOGGER.debug("Changed overrideHealthStatus from {} to {}", oldStatus, overrideHealthStatus);
+    return oldStatus;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response health() {
+    if (isHealthy()) {
+      return Response
+          .ok("All is good.")
+          .build();
+    } else {
+      return Response
+          .serverError()
+          .entity("Not Healthy.")
+          .build();
+    }
+  }
+
+  private boolean isHealthy() {
+    if (overrideHealthStatus == null) {
+      return true;
+    }
+    return overrideHealthStatus;
+  }
+
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
new file mode 100644
index 0000000..1bf1f88
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.services;
+
+import java.io.Closeable;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.diff.ClusterProvider;
+import org.apache.cassandra.diff.JobConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+
+// TODO cache jobsummary
+// TODO fix exception handling
+public class DBService implements Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(DBService.class);
+    private static final long QUERY_TIMEOUT_MS = 3000;
+
+    private final Cluster cluster;
+    private final Session session;
+    private final String diffKeyspace;
+
+    private final PreparedStatement runningJobsStatement;
+    private final PreparedStatement jobSummaryStatement;
+    private final PreparedStatement jobResultStatement;
+    private final PreparedStatement jobStatusStatement;
+    private final PreparedStatement jobMismatchesStatement;
+    private final PreparedStatement jobErrorSummaryStatement;
+    private final PreparedStatement jobErrorRangesStatement;
+    private final PreparedStatement jobErrorDetailStatement;
+    private final PreparedStatement jobsStartDateStatement;
+    private final PreparedStatement jobsForSourceStatement;
+    private final PreparedStatement jobsForTargetStatement;
+    private final PreparedStatement jobsForKeyspaceStatement;
+
+    public DBService(JobConfiguration config) {
+        logger.info("Initializing DBService");
+        ClusterProvider provider = ClusterProvider.getProvider(config.clusterConfig("metadata"), "metadata");
+        cluster = provider.getCluster();
+
+        session = cluster.connect();
+        diffKeyspace = config.metadataOptions().keyspace;
+        runningJobsStatement = session.prepare(String.format(
+            " SELECT job_id " +
+            " FROM %s.running_jobs", diffKeyspace));
+        jobSummaryStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id," +
+            "   job_start_time," +
+            "   buckets," +
+            "   keyspace_name, " +
+            "   table_names, " +
+            "   source_cluster_name," +
+            "   source_cluster_desc," +
+            "   target_cluster_name," +
+            "   target_cluster_desc," +
+            "   total_tasks " +
+            " FROM %s.job_summary" +
+            " WHERE job_id = ?", diffKeyspace));
+        jobResultStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id," +
+            "   table_name, " +
+            "   matched_partitions," +
+            "   mismatched_partitions,"   +
+            "   matched_rows,"   +
+            "   matched_values,"   +
+            "   mismatched_values,"   +
+            "   partitions_only_in_source,"   +
+            "   partitions_only_in_target,"   +
+            "   skipped_partitions"   +
+            " FROM %s.job_results" +
+            " WHERE job_id = ? AND table_name = ?", diffKeyspace));
+        jobStatusStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id,"   +
+            "   bucket,"   +
+            "   table_name,"   +
+            "   completed "   +
+            " FROM %s.job_status" +
+            " WHERE job_id = ? AND bucket = ?", diffKeyspace));
+        jobMismatchesStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id," +
+            "   bucket," +
+            "   table_name," +
+            "   mismatching_token," +
+            "   mismatch_type" +
+            " FROM %s.mismatches" +
+            " WHERE job_id = ? AND bucket = ?", diffKeyspace));
+        jobErrorSummaryStatement = session.prepare(String.format(
+            " SELECT " +
+            "   count(start_token) AS error_count," +
+            "   table_name" +
+            " FROM %s.task_errors" +
+            " WHERE job_id = ? AND bucket = ?",
+            diffKeyspace));
+        jobErrorRangesStatement = session.prepare(String.format(
+            " SELECT " +
+            "   bucket,"   +
+            "   table_name,"   +
+            "   start_token,"   +
+            "   end_token"   +
+            " FROM %s.task_errors" +
+            " WHERE job_id = ? AND bucket = ?",
+            diffKeyspace));
+        jobErrorDetailStatement = session.prepare(String.format(
+            " SELECT " +
+            "   table_name,"   +
+            "   error_token"   +
+            " FROM %s.partition_errors" +
+            " WHERE job_id = ? AND bucket = ? AND table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
+        jobsStartDateStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.job_start_index" +
+            " WHERE job_start_date = ? AND job_start_hour = ?", diffKeyspace));
+        jobsForSourceStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.source_cluster_index" +
+            " WHERE source_cluster_name = ?", diffKeyspace));
+        jobsForTargetStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.target_cluster_index" +
+            " WHERE target_cluster_name = ?", diffKeyspace));
+        jobsForKeyspaceStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.keyspace_index" +
+            " WHERE keyspace_name = ?", diffKeyspace));
+    }
+
+    public List<UUID> fetchRunningJobs() {
+        List<UUID> jobs = new ArrayList<>();
+        ResultSet rs = session.execute(runningJobsStatement.bind());
+        rs.forEach(row -> jobs.add(row.getUUID("job_id")));
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobSummaries(List<UUID> jobIds) {
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(jobIds.size());
+        jobIds.forEach(id -> futures.add(session.executeAsync(jobSummaryStatement.bind(id))));
+
+        // Oldest first
+        SortedSet<JobSummary> summaries = Sets.newTreeSet(JobSummary.COMPARATOR);
+        processFutures(futures, JobSummary::fromRow, summaries::add);
+        return summaries;
+    }
+
+    public JobSummary fetchJobSummary(UUID jobId) {
+        Row row = session.execute(jobSummaryStatement.bind(jobId)).one();
+        if (row == null)
+            throw new RuntimeException(String.format("Job %s not found", jobId));
+        return JobSummary.fromRow(row);
+    }
+
+    public Collection<JobResult> fetchJobResults(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.tables.size());
+        for (String table : summary.tables)
+            futures.add(session.executeAsync(jobResultStatement.bind(jobId, table)));
+
+        SortedSet<JobResult> results = Sets.newTreeSet();
+        processFutures(futures, JobResult::fromRow, results::add);
+        return results;
+    }
+
+    public JobStatus fetchJobStatus(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobStatusStatement.bind(jobId, i)));
+
+        Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> completedByTable.merge(row.getString("table_name"),
+                                                              row.getLong("completed"),
+                                                              Long::sum));
+        return new JobStatus(jobId, completedByTable);
+    }
+
+    public JobMismatches fetchMismatches(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobMismatchesStatement.bind(jobId, i)));
+
+        Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> mismatchesByTable.merge(row.getString("table_name"),
+                                                               Lists.newArrayList(new Mismatch(row.getString("mismatching_token"),
+                                                                                               row.getString("mismatch_type"))),
+                                                               (l1, l2) -> { l1.addAll(l2); return l1;}));
+        return new JobMismatches(jobId, mismatchesByTable);
+    }
+
+    public JobErrorSummary fetchErrorSummary(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobErrorSummaryStatement.bind(jobId, i)));
+
+        Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> {
+            String table = row.getString("table_name");
+            if (null != table) {
+                errorCountByTable.merge(row.getString("table_name"),
+                                        row.getLong("error_count"),
+                                        Long::sum);
+            }
+        });
+        return new JobErrorSummary(jobId, errorCountByTable);
+    }
+
+    public JobErrorRanges fetchErrorRanges(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
+
+        Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> errorRangesByTable.merge(row.getString("table_name"),
+                                                                Lists.newArrayList(new Range(row.getString("start_token"),
+                                                                                             row.getString("end_token"))),
+                                                                (l1, l2) -> { l1.addAll(l2); return l1;}));
+        return new JobErrorRanges(jobId, errorRangesByTable);
+    }
+
+    public JobErrorDetail fetchErrorDetail(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> rangeFutures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++ )
+            rangeFutures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
+
+        List<ResultSetFuture> errorFutures = Lists.newArrayList();
+        processFutures(rangeFutures,
+                       row -> session.executeAsync(jobErrorDetailStatement.bind(jobId,
+                                                                                row.getInt("bucket"),
+                                                                                row.getString("table_name"),
+                                                                                row.getString("start_token"),
+                                                                                row.getString("end_token"))),
+                       errorFutures::add);
+        Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(errorFutures,
+                       row -> errorsByTable.merge(row.getString("table_name"),
+                                                  Lists.newArrayList(row.getString("error_token")),
+                                                  (l1, l2) -> { l1.addAll(l2); return l1;}));
+        return new JobErrorDetail(jobId, errorsByTable);
+    }
+
+    public Collection<JobSummary> fetchJobsStartedBetween(DateTime start, DateTime end) {
+        int days = Days.daysBetween(start, end).getDays();
+        List<ResultSetFuture> idFutures = Lists.newArrayListWithCapacity(days * 24);
+        for (int i = 0; i <= days; i++) {
+            DateTime date = start.plusDays(i);
+            LocalDate ld = LocalDate.fromYearMonthDay(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth());
+            for (int j = 0; j <= 23; j++) {
+                idFutures.add(session.executeAsync(jobsStartDateStatement.bind(ld, j)));
+            }
+        }
+
+        List<ResultSetFuture> jobFutures = Lists.newArrayList();
+        processFutures(idFutures,
+                       row -> session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id"))),
+                       jobFutures::add);
+
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(jobFutures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobsForSourceCluster(String sourceClusterName) {
+        ResultSet jobIds = session.execute(jobsForSourceStatement.bind(sourceClusterName));
+        List<ResultSetFuture> futures = Lists.newArrayList();
+        jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(futures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobsForTargetCluster(String targetClusterName) {
+        ResultSet jobIds = session.execute(jobsForTargetStatement.bind(targetClusterName));
+        List<ResultSetFuture> futures = Lists.newArrayList();
+        jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+        // most recent first
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(futures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobsForKeyspace(String keyspace) {
+        ResultSet jobIds = session.execute(jobsForKeyspaceStatement.bind(keyspace));
+        List<ResultSetFuture> futures = Lists.newArrayList();
+        jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+        // most recent first
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(futures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    private void processFutures(List<ResultSetFuture> futures, Consumer<Row> consumer) {
+        processFutures(futures, Function.identity(), consumer);
+    }
+
+    private <T> void processFutures(List<ResultSetFuture> futures, Function<Row, T> transform, Consumer<T> consumer) {
+        Consumer<ResultSet> resultConsumer = resultSet -> resultSet.forEach(row -> consumer.accept(transform.apply(row)));
+        futures.forEach(f -> getResultsSafely(f).ifPresent(resultConsumer));
+    }
+
+    private Optional<ResultSet> getResultsSafely(ResultSetFuture future) {
+        try {
+            return Optional.ofNullable(future.get(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        } catch (Exception e) {
+            logger.warn("Error getting result from async query", e);
+            return Optional.empty();
+        }
+    }
+
+    public void close()
+    {
+        session.close();
+        cluster.close();
+    }
+
+    public static class Range {
+        final String start;
+        final String end;
+
+        public Range(String start, String end) {
+            this.start = start;
+            this.end = end;
+        }
+    }
+
+    public static class JobErrorSummary {
+        final UUID jobId;
+        final Map<String, Long> errorsByTable;
+
+        public JobErrorSummary(UUID jobId, Map<String, Long> errorsByTable) {
+            this.jobId = jobId;
+            this.errorsByTable = errorsByTable;
+        }
+    }
+
+    public static class JobErrorRanges {
+        final UUID jobId;
+        final Map<String, List<Range>> rangesByTable;
+
+        public JobErrorRanges(UUID jobId, Map<String, List<Range>> rangesByTable) {
+            this.jobId = jobId;
+            this.rangesByTable = rangesByTable;
+        }
+    }
+
+    public static class JobErrorDetail {
+        final UUID jobId;
+        final Map<String, List<String>> errorsByTable;
+
+        public JobErrorDetail(UUID jobId, Map<String, List<String>> errorsByTable) {
+            this.jobId = jobId;
+            this.errorsByTable = errorsByTable;
+        }
+    }
+
+    public static class JobMismatches {
+
+        final UUID jobId;
+        final Map<String, List<Mismatch>> mismatchesByTable;
+
+        public JobMismatches(UUID jobId, Map<String, List<Mismatch>> mismatchesByTable) {
+            this.jobId = jobId;
+            this.mismatchesByTable = mismatchesByTable;
+        }
+    }
+
+    public static class Mismatch {
+        final String token;
+        final String type;
+
+        public Mismatch(String token, String type) {
+            this.token = token;
+            this.type = type;
+        }
+    }
+
+    public static class JobStatus {
+
+        final UUID jobId;
+        final Map<String, Long> completedByTable;
+
+        public JobStatus(UUID jobId, Map<String, Long> completedByTable) {
+            this.jobId = jobId;
+            this.completedByTable = completedByTable;
+        }
+    }
+
+    public static class JobResult implements Comparable<JobResult> {
+
+        final UUID jobId;
+        final String table;
+        final long matchedPartitions;
+        final long mismatchedPartitions;
+        final long matchedRows;
+        final long matchedValues;
+        final long mismatchedValues;
+        final long onlyInSource;
+        final long onlyInTarget;
+        final long skippedPartitions;
+
+        public JobResult(UUID jobId,
+                         String table,
+                         long matchedPartitions,
+                         long mismatchedPartitions,
+                         long matchedRows,
+                         long matchedValues,
+                         long mismatchedValues,
+                         long onlyInSource,
+                         long onlyInTarget,
+                         long skippedPartitions) {
+            this.jobId = jobId;
+            this.table = table;
+            this.matchedPartitions = matchedPartitions;
+            this.mismatchedPartitions = mismatchedPartitions;
+            this.matchedRows = matchedRows;
+            this.matchedValues = matchedValues;
+            this.mismatchedValues = mismatchedValues;
+            this.onlyInSource = onlyInSource;
+            this.onlyInTarget = onlyInTarget;
+            this.skippedPartitions = skippedPartitions;
+        }
+
+        static JobResult fromRow(Row row) {
+            return new JobResult(row.getUUID("job_id"),
+                                 row.getString("table_name"),
+                                 row.getLong("matched_partitions"),
+                                 row.getLong("mismatched_partitions"),
+                                 row.getLong("matched_rows"),
+                                 row.getLong("matched_values"),
+                                 row.getLong("mismatched_values"),
+                                 row.getLong("partitions_only_in_source"),
+                                 row.getLong("partitions_only_in_target"),
+                                 row.getLong("skipped_partitions"));
+        }
+
+        public int compareTo(@NotNull JobResult other) {
+            return this.table.compareTo(other.table);
+        }
+    }
+
+    public static class JobSummary {
+
+        public static final Comparator<JobSummary> COMPARATOR = Comparator.comparing(j -> j.startTime);
+
+        final UUID jobId;
+        final int buckets;
+        final String keyspace;
+        final List<String> tables;
+        final String sourceClusterName;
+        final String sourceClusterDesc;
+        final String targetClusterName;
+        final String targetClusterDesc;
+        final int tasks;
+        final String start;
+
+        // private so it isn't included in json serialization
+        private final DateTime startTime;
+
+        private JobSummary(UUID jobId,
+                           DateTime startTime,
+                           int buckets,
+                           String keyspace,
+                           List<String> tables,
+                           String sourceClusterName,
+                           String sourceClusterDesc,
+                           String targetClusterName,
+                           String targetClusterDesc,
+                           int tasks)
+        {
+            this.jobId = jobId;
+            this.startTime = startTime;
+            this.start = startTime.toString();
+            this.buckets = buckets;
+            this.keyspace = keyspace;
+            this.tables = tables;
+            this.sourceClusterName = sourceClusterName;
+            this.sourceClusterDesc = sourceClusterDesc;
+            this.targetClusterName = targetClusterName;
+            this.targetClusterDesc = targetClusterDesc;
+            this.tasks = tasks;
+        }
+
+        static JobSummary fromRow(Row row) {
+            return new JobSummary(row.getUUID("job_id"),
+                                  new DateTime(UUIDs.unixTimestamp(row.getUUID("job_start_time")), DateTimeZone.UTC),
+                                  row.getInt("buckets"),
+                                  row.getString("keyspace_name"),
+                                  row.getList("table_names", String.class),
+                                  row.getString("source_cluster_name"),
+                                  row.getString("source_cluster_desc"),
+                                  row.getString("target_cluster_name"),
+                                  row.getString("target_cluster_desc"),
+                                  row.getInt("total_tasks"));
+        }
+    }
+}
diff --git a/api-server/src/main/resources/log4j2.xml b/api-server/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..9e401d6
--- /dev/null
+++ b/api-server/src/main/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration>
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..f73c4f5
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+         http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+      <groupId>org.apache.cassandra.diff</groupId>
+      <artifactId>diff</artifactId>
+      <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>common</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java b/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java
new file mode 100644
index 0000000..6c4e3e4
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import com.datastax.driver.core.Cluster;
+
+public interface ClusterProvider extends Serializable {
+    String CLUSTER_PROVIDER_CLASS = "impl";
+
+    void initialize(Map<String, String> conf, String identifier);
+    Cluster getCluster();
+    String getClusterName();
+
+    static ClusterProvider getProvider(Map<String, String> conf, String identifier) {
+        String providerImpl = conf.get(CLUSTER_PROVIDER_CLASS);
+        ClusterProvider provider;
+        try {
+            provider = (ClusterProvider)(Class.forName(providerImpl)).newInstance();
+            provider.initialize(conf, identifier);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate ClusterProvider class: " + providerImpl +" " + conf, e);
+        }
+        return provider;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java b/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java
new file mode 100644
index 0000000..b633feb
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.Map;
+
+import com.datastax.driver.core.Cluster;
+
+public class ContactPointsClusterProvider implements ClusterProvider {
+    private static final String PREFIX = "diff.cluster";
+    private static final String USERNAME_KEY = "cql_user";
+    private static final String PASSWORD_KEY = "cql_password";
+    private static final String CONTACT_POINTS_KEY = "contact_points";
+    private static final String PORT_KEY = "port";
+    private static final String CLUSTER_KEY = "name";
+    private static final String DC_KEY = "dc";
+
+    private String user;
+    private String password;
+    private String name;
+    private String dc;
+    private String[] contactPoints;
+    private int port;
+
+    public Cluster getCluster() {
+        return newCluster();
+    }
+
+    public void initialize(Map<String, String> conf, String identifier) {
+        user = getEnv(identifier, USERNAME_KEY);
+        password = getEnv(identifier, PASSWORD_KEY);
+        name = conf.get(CLUSTER_KEY);
+        dc = conf.get(DC_KEY);
+        contactPoints = conf.get(CONTACT_POINTS_KEY).split(",");
+        port = Integer.parseInt(conf.getOrDefault(PORT_KEY, "9042"));
+    }
+
+    private synchronized Cluster newCluster() {
+        // TODO add policies etc
+        Cluster.Builder builder = Cluster.builder()
+                                         .addContactPoints(contactPoints)
+                                         .withPort(port);
+
+        if (user != null)
+            builder.withCredentials(user, password);
+
+        return builder.build();
+    }
+
+    public String getClusterName() {
+        return name;
+    }
+
+    public String toString() {
+        return String.format("ContactPoints Cluster: name=%s, dc=%s, contact points= [%s]",
+                             "name", dc, String.join(",'", contactPoints));
+    }
+
+    static String getEnv(String identifier, String propName) {
+        return System.getenv(String.format("%s.%s.%s", PREFIX, identifier, propName));
+    }
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
new file mode 100644
index 0000000..f0cbb36
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+public interface JobConfiguration extends Serializable {
+    String keyspace();
+
+    List<String> tables();
+
+    int splits();
+
+    int buckets();
+
+    // rate limit is provided as a global limit - this is how many q/s we guess that the src clusters can take in total
+    int rateLimit();
+
+    default SpecificTokens specificTokens() {
+        return SpecificTokens.NONE;
+    }
+
+    Optional<UUID> jobId();
+
+    int tokenScanFetchSize();
+
+    int partitionReadFetchSize();
+
+    int readTimeoutMillis();
+
+    double reverseReadProbability();
+
+    String consistencyLevel();
+
+    MetadataKeyspaceOptions metadataOptions();
+
+    Map<String, String> clusterConfig(String identifier);
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java b/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java
new file mode 100644
index 0000000..b3e9eb0
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+
+public class MetadataKeyspaceOptions implements Serializable {
+    public String keyspace = "cassandradiff";
+    public String replication = "{'class':'SimpleStrategy', 'replication_factor':'1'}";
+    public int ttl = 60 * 60 * 24 * 365;
+    public boolean should_init = false;
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java b/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java
new file mode 100644
index 0000000..5400a54
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+
+public class SpecificTokens implements Serializable, Predicate<BigInteger> {
+
+    public static final SpecificTokens NONE = new SpecificTokens(Collections.emptySet(), Modifier.REJECT);
+
+    public enum Modifier {REJECT, ACCEPT}
+
+    public final ImmutableSet<BigInteger> tokens;
+    public final Modifier modifier;
+
+    public SpecificTokens(Set<BigInteger> tokens, Modifier modifier) {
+        this.tokens = ImmutableSet.copyOf(tokens);
+        this.modifier = modifier;
+    }
+
+    public boolean test(BigInteger token) {
+        if (tokens.isEmpty())
+            return true;
+
+        // if this represents a list of tokens to accept, then this token is allowed
+        // only if it is present in the list. If this is a list of tokens to reject,
+        // then the opposite is true, only allow the token if it is not in the list.
+        return (modifier == Modifier.ACCEPT) == tokens.contains(token);
+    }
+
+    public boolean isEmpty() {
+        return tokens.isEmpty();
+    }
+
+    public String toString() {
+        return String.format("SpecificTokens: [ %s, %s ]", modifier, tokens);
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
new file mode 100644
index 0000000..69fc28c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
+
+public class YamlJobConfiguration implements JobConfiguration {
+    public int splits = 10000;
+    public String keyspace;
+    public List<String> tables;
+    public int buckets = 100;
+    public int rate_limit = 10000;
+    public String job_id = null;
+    public int token_scan_fetch_size;
+    public int partition_read_fetch_size;
+    public int read_timeout_millis;
+    public double reverse_read_probability;
+    public String consistency_level = "ALL";
+    public MetadataKeyspaceOptions metadata_options;
+    public Map<String, Map<String, String>> cluster_config;
+    public String specific_tokens = null;
+    public String disallowed_tokens = null;
+
+    public static YamlJobConfiguration load(String file) {
+        Yaml yaml = new Yaml(new CustomClassLoaderConstructor(YamlJobConfiguration.class,
+                                                              Thread.currentThread().getContextClassLoader()));
+        try {
+            return yaml.loadAs(new FileInputStream(file), YamlJobConfiguration.class);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String keyspace() {
+        return keyspace;
+    }
+
+    public List<String> tables() {
+        return tables;
+    }
+
+    public int splits() {
+        return splits;
+    }
+
+    public int buckets() {
+        return buckets;
+    }
+
+    public int rateLimit() {
+        return rate_limit;
+    }
+
+    public Optional<UUID> jobId() {
+        return job_id == null ? Optional.empty() : Optional.of(UUID.fromString(job_id));
+    }
+
+    public int tokenScanFetchSize() {
+        return token_scan_fetch_size;
+    }
+
+    public int partitionReadFetchSize() {
+        return partition_read_fetch_size;
+    }
+
+    public int readTimeoutMillis() {
+        return read_timeout_millis;
+    }
+
+    public double reverseReadProbability() {
+        return reverse_read_probability;
+    }
+
+    public String consistencyLevel() {
+        return consistency_level;
+    }
+
+    public MetadataKeyspaceOptions metadataOptions() {
+        return metadata_options;
+    }
+
+    public Map<String, String> clusterConfig(String identifier) {
+        return cluster_config.get(identifier);
+    }
+
+    public SpecificTokens specificTokens() {
+
+        if (disallowed_tokens != null && specific_tokens != null)
+            throw new RuntimeException("Cannot specify both disallowed and specific tokens");
+
+        if (disallowed_tokens != null) {
+            return new SpecificTokens(toTokens(disallowed_tokens), SpecificTokens.Modifier.REJECT);
+        } else if (specific_tokens != null) {
+            return new SpecificTokens(toTokens(specific_tokens), SpecificTokens.Modifier.ACCEPT);
+        }
+        return SpecificTokens.NONE;
+    }
+
+    public String toString() {
+        return "YamlJobConfiguration{" +
+               "splits=" + splits +
+               ", keyspace='" + keyspace + '\'' +
+               ", tables=" + tables +
+               ", buckets=" + buckets +
+               ", rate_limit=" + rate_limit +
+               ", job_id='" + job_id + '\'' +
+               ", token_scan_fetch_size=" + token_scan_fetch_size +
+               ", partition_read_fetch_size=" + partition_read_fetch_size +
+               ", read_timeout_millis=" + read_timeout_millis +
+               ", reverse_read_probability=" + reverse_read_probability +
+               ", consistency_level='" + consistency_level + '\'' +
+               ", metadata_options=" + metadata_options +
+               ", cluster_config=" + cluster_config +
+               '}';
+    }
+
+    private static Set<BigInteger> toTokens(String str) {
+        Set<BigInteger> tokens = new HashSet<>();
+        for (String token : str.split(",")) {
+            token = token.trim();
+            tokens.add(new BigInteger(token));
+        }
+        return tokens;
+    }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fce7cc1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
+		 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.cassandra.diff</groupId>
+    <artifactId>diff</artifactId>
+    <packaging>pom</packaging>
+    <version>0.1-SNAPSHOT</version>
+    <modules>
+      <module>common</module>
+      <module>spark-job</module>
+      <module>api-server</module>
+    </modules>
+
+    <properties>
+      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencyManagement>
+      <dependencies>
+
+        <dependency>
+          <groupId>com.datastax.cassandra</groupId>
+          <artifactId>cassandra-driver-core</artifactId>
+          <version>3.7.1</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.yaml</groupId>
+          <artifactId>snakeyaml</artifactId>
+          <version>1.24</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+          <version>28.0-jre</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+          <version>1.7.28</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.jetbrains</groupId>
+          <artifactId>annotations</artifactId>
+          <version>17.0.0</version>
+          <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+          <version>2.12.1</version>
+          <scope>runtime</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+          <version>4.12</version>
+          <scope>test</scope>
+        </dependency>
+
+      </dependencies>
+    </dependencyManagement>
+    <build>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-compiler-plugin</artifactId>
+          <version>3.6.1</version>
+          <configuration>
+            <source>1.8</source>
+            <target>1.8</target>
+          </configuration>
+        </plugin>
+      </plugins>
+    </build>
+</project>
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig.yaml
new file mode 100644
index 0000000..d4f5630
--- /dev/null
+++ b/spark-job/localconfig.yaml
@@ -0,0 +1,51 @@
+# Keyspace to diff
+keyspace: keyspace1
+# List of tables to diff
+tables:
+  - standard1
+
+# This is how many parts we split the full token range in.
+# Each of these splits is then compared between the clusters
+splits: 10000
+
+# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata
+buckets: 100
+
+# global rate limit - this is how many q/s you think the target clusters can handle
+rate_limit: 10000
+
+# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits
+# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef
+
+# Fetch size to use for the query fetching the tokens in the cluster
+token_scan_fetch_size: 1000
+# Fetch size to use for the queries fetching the rows of each partition
+partition_read_fetch_size: 1000
+
+read_timeout_millis: 10000
+reverse_read_probability: 0.5
+consistency_level: ALL
+metadata_options:
+  keyspace: cassandradiff
+  replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
+  ttl: 31536000
+  should_init: true
+cluster_config:
+  source:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_1"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
+  target:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_2"
+    contact_points: "127.0.0.1"
+    port: "9043"
+    dc: "datacenter1"
+  metadata:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
diff --git a/spark-job/pom.xml b/spark-job/pom.xml
new file mode 100644
index 0000000..4d4c8fc
--- /dev/null
+++ b/spark-job/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+		 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.cassandra.diff</groupId>
+        <artifactId>diff</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>spark-job</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <!-- compile dependencies -->
+        <dependency>
+            <groupId>org.apache.cassandra.diff</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <!-- provided dependencies -->
+        <dependency>
+          <groupId>org.jetbrains</groupId>
+          <artifactId>annotations</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <version>2.3.3</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+            <version>2.3.2</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.1</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <relocations>
+                        <relocation>
+                            <pattern>com.google</pattern>
+                            <shadedPattern>relocated.com.google</shadedPattern>
+                        </relocation>
+                    </relocations>
+                </configuration>
+            </plugin>
+
+        </plugins>
+    </build>
+
+</project>
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
new file mode 100644
index 0000000..bdd6488
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Wrapper for an ExecutorService which provides backpressure by blocking on submission when its
+ * task queue is full.
+ *
+ * The internal ListeningExecutorService is instantiated with an unbounded work queue, but
+ * this class uses a Semaphore to ensure that this queue cannot grow unreasonably. By default,
+ * the Semaphore has 2x as many permits as the executor thread pool has threads, so at most
+ * 2 x maxConcurrentTasks may be submitted before producers are blocked. The submit method
+ * also adds success/failure callbacks which return the permits, enabling producers to make
+ * progress at a manageable rate.
+ *
+ * Callers of the submit method also provide a Phaser, which they can use to ensure that any
+ * tasks *they themselves have submitted* are completed before they proceed. This allows multiple
+ * callers to submit tasks to the same ComparisonExecutor, but only wait for their own to complete
+ * before moving onto the next stage of processing. Managing the increment and decrement of pending
+ * tasks via the Phaser is handled transparently by ComparisonExecutor, so callers should not do
+ * this externally.
+ *
+ * Submitters also provide callbacks to be run on either successful execution or failure of the
+ * task. These callbacks are executed on the same thread as the task itself, which callers should bear
+ * in mind when constructing them.
+ *
+ */
+public class ComparisonExecutor {
+
+    private final ListeningExecutorService executor;
+    private final Semaphore semaphore;
+
+    static ComparisonExecutor newExecutor(int maxConcurrentTasks, MetricRegistry metricRegistry) {
+        return new ComparisonExecutor(
+            MoreExecutors.listeningDecorator(
+                Executors.newFixedThreadPool(maxConcurrentTasks,
+                                             new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
+                                                                       .setDaemon(true)
+                                                                       .build())),
+            maxConcurrentTasks * 2,
+            metricRegistry);
+    }
+
+    @VisibleForTesting
+    ComparisonExecutor(ListeningExecutorService executor, int maxTasks, MetricRegistry metrics) {
+        this.executor = executor;
+        this.semaphore = new Semaphore(maxTasks);
+        if (metrics != null) {
+            metrics.register("BlockedTasks", (Gauge) semaphore::getQueueLength);
+            metrics.register("AvailableSlots", (Gauge) semaphore::availablePermits);
+        }
+    }
+
+    public <T> void submit(final Callable<T> callable,
+                           final Consumer<T> onSuccess,
+                           final Consumer<Throwable> onError,
+                           final Phaser phaser) {
+
+        phaser.register();
+        semaphore.acquireUninterruptibly();
+        try {
+            Futures.addCallback(executor.submit(callable), new FutureCallback<T>() {
+                public void onSuccess(T result) {
+                    fireThenReleaseAndArrive(onSuccess, result, phaser);
+                }
+
+                public void onFailure(Throwable t) {
+                    fireThenReleaseAndArrive(onError, t, phaser);
+                }
+            }, MoreExecutors.directExecutor());
+
+        } catch (RejectedExecutionException e) {
+            fireThenReleaseAndArrive(onError, e, phaser);
+        }
+
+    }
+
+    private <T> void fireThenReleaseAndArrive(Consumer<T> callback, T argument, Phaser phaser) {
+        try {
+            callback.accept(argument);
+        } finally {
+            semaphore.release();
+            phaser.arriveAndDeregister();
+        }
+    }
+
+    public void shutdown() {
+        executor.shutdown();
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
new file mode 100644
index 0000000..4e108e3
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.*;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.cassandra.diff.DiffContext.cqlizedString;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+
+public class DiffCluster implements AutoCloseable
+{
+    private final static Logger logger = LoggerFactory.getLogger(DiffCluster.class);
+
+    public enum Type {SOURCE, TARGET}
+
+    private final Map<String, PreparedStatement[]> preparedStatements = new HashMap<>();
+    private final ConsistencyLevel consistencyLevel;
+    public final Cluster cluster;
+    private final Session session;
+    private final TokenHelper tokenHelper;
+    public final String keyspace;
+    public final List<BigInteger> tokenList;
+
+    public final RateLimiter getPartitionRateLimiter;
+    public final Type clusterId;
+    private final int tokenScanFetchSize;
+    private final int partitionReadFetchSize;
+    private final int readTimeoutMillis;
+
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+    public DiffCluster(Type clusterId,
+                       Cluster cluster,
+                       String keyspace,
+                       ConsistencyLevel consistencyLevel,
+                       RateLimiter getPartitionRateLimiter,
+                       int tokenScanFetchSize,
+                       int partitionReadFetchSize,
+                       int readTimeoutMillis)
+
+    {
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.cluster = cluster;
+        this.tokenHelper = TokenHelper.forPartitioner(cluster.getMetadata().getPartitioner());
+        this.clusterId = clusterId;
+        this.tokenList = Collections.emptyList();
+        this.getPartitionRateLimiter = getPartitionRateLimiter;
+        this.session = cluster.connect();
+        this.tokenScanFetchSize = tokenScanFetchSize;
+        this.partitionReadFetchSize = partitionReadFetchSize;
+        this.readTimeoutMillis = readTimeoutMillis;
+    }
+
+    public Iterator<PartitionKey> getPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+        try {
+            return Uninterruptibles.getUninterruptibly(fetchPartitionKeys(table, prevToken, token));
+        }
+        catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+        BoundStatement statement = keyReader(table).bind(tokenHelper.forBindParam(prevToken),
+                                                         tokenHelper.forBindParam(token));
+        statement.setFetchSize(tokenScanFetchSize);
+        statement.setReadTimeoutMillis(readTimeoutMillis);
+        return Futures.transform(session.executeAsync(statement),
+                                 this::toPartitionKeys,
+                                 MoreExecutors.directExecutor());
+    }
+
+    private AbstractIterator<PartitionKey> toPartitionKeys(ResultSet resultSet) {
+        return new AbstractIterator<PartitionKey>() {
+            Iterator<Row> rows = resultSet.iterator();
+
+            protected PartitionKey computeNext() {
+                if (session.isClosed())
+                    throw new RuntimeException("Session was closed, cannot get next partition key");
+
+                if (stopped.get())
+                    throw new RuntimeException("Job was stopped, cannot get next partition key");
+
+                return rows.hasNext() ? new PartitionKey(rows.next()) : endOfData();
+            }
+        };
+    }
+
+    public Iterator<Row> getPartition(TableSpec table, PartitionKey key, boolean shouldReverse) {
+        return readPartition(table.getTable(), key, shouldReverse)
+               .getUninterruptibly()
+               .iterator();
+    }
+
+    private ResultSetFuture readPartition(String table, final PartitionKey key, boolean shouldReverse) {
+        BoundStatement statement = shouldReverse
+                                   ? reverseReader(table).bind(key.getComponents().toArray())
+                                   : forwardReader(table).bind(key.getComponents().toArray());
+        statement.setFetchSize(partitionReadFetchSize);
+        statement.setReadTimeoutMillis(readTimeoutMillis);
+        getPartitionRateLimiter.acquire();
+        return session.executeAsync(statement);
+    }
+
+    public void stop() {
+        stopped.set(true);
+    }
+
+    public void close()
+    {
+        logger.info("Closing cluster {}", this.clusterId);
+        session.closeAsync();
+        cluster.closeAsync();
+    }
+
+    private PreparedStatement keyReader(String table) {
+        return getStatementForTable(table, 0);
+    }
+
+    private PreparedStatement forwardReader(String table) {
+        return getStatementForTable(table, 1);
+    }
+
+    private PreparedStatement reverseReader(String table) {
+        return getStatementForTable(table, 2);
+    }
+
+    private PreparedStatement getStatementForTable(String table, int index) {
+        if (!preparedStatements.containsKey(table)) {
+            synchronized (this) {
+                if (!preparedStatements.containsKey(table)) {
+                    PreparedStatement keyStatement = getKeyStatement(table);
+                    PreparedStatement[] partitionReadStmts = getFullStatement(table);
+                    preparedStatements.put(table, new PreparedStatement[]{ keyStatement ,
+                                                                           partitionReadStmts[0],
+                                                                           partitionReadStmts[1] });
+                }
+            }
+        }
+        return preparedStatements.get(table)[index];
+    }
+
+    private PreparedStatement getKeyStatement(@NotNull String table) {
+        final TableMetadata tableMetadata = session.getCluster()
+                                                   .getMetadata()
+                                                   .getKeyspace(cqlizedString(keyspace))
+                                                   .getTable(cqlizedString(table));
+        String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
+
+        Select.Selection selection = QueryBuilder.select().distinct().column(token(partitionKeyColumns));
+        for (String column : partitionKeyColumns)
+            selection = selection.column(column);
+
+        BuiltStatement select = selection.from(tableMetadata)
+                                         .where(gt(token(partitionKeyColumns), bindMarker()))
+                                         .and(lte(token(partitionKeyColumns), bindMarker()));
+
+        logger.debug("Partition key/token read CQL : {}", select.toString());
+        return session.prepare(select).setConsistencyLevel(consistencyLevel);
+    }
+
+    private PreparedStatement[] getFullStatement(@NotNull String table) {
+        final TableMetadata tableMetadata = session.getCluster()
+                                                   .getMetadata()
+                                                   .getKeyspace(cqlizedString(keyspace))
+                                                   .getTable(cqlizedString(table));
+        String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
+        String[] allColumns = columnNames(tableMetadata.getColumns());
+
+        Select.Selection selection = QueryBuilder.select().column(token(partitionKeyColumns));
+        for (String column : allColumns)
+            selection = selection.column(column);
+
+        Select select = selection.from(tableMetadata);
+
+        for (String column : partitionKeyColumns)
+            select.where().and(eq(column, bindMarker()));
+
+        logger.info("Partition forward read CQL : {}", select.toString());
+        PreparedStatement forwardRead = session.prepare(select).setConsistencyLevel(consistencyLevel);
+
+        List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns();
+        // if the table has no clustering columns a reverse read doesn't make sense
+        // and will never be executed, so just skip preparing the reverse query
+        if (clusteringColumns.isEmpty())
+            return new PreparedStatement[] {forwardRead, null};
+
+        // Depending on DiffContext.reverseReadProbability, we may attempt to read the
+        // partition in reverse order, so prepare a statement for that
+        List<ClusteringOrder> clusteringOrders = tableMetadata.getClusteringOrder();
+        Ordering[] reverseOrdering = new Ordering[clusteringColumns.size()];
+        for (int i=0; i<clusteringColumns.size(); i++) {
+            reverseOrdering[i] = clusteringOrders.get(i) == ClusteringOrder.ASC
+                                 ? desc(clusteringColumns.get(i).getName())
+                                 : asc(clusteringColumns.get(i).getName());
+        }
+
+        select.orderBy(reverseOrdering);
+        logger.info("Partition reverse read CQL : {}", select.toString());
+
+        PreparedStatement reverseRead = session.prepare(select).setConsistencyLevel(consistencyLevel);
+
+        return new PreparedStatement[] {forwardRead, reverseRead};
+    }
+
+    private static String[] columnNames(List<ColumnMetadata> columns) {
+        return columns.stream().map(ColumnMetadata::getName).map(DiffCluster::columnToString).toArray(String[]::new);
+    }
+
+    private static String columnToString(String name)
+    {
+        return '"'+name+'"';
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java
new file mode 100644
index 0000000..5a1a354
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DiffContext {
+
+    public final String keyspace;
+    public final TableSpec table;
+    public final BigInteger startToken;
+    public final BigInteger endToken;
+    public final DiffCluster source;
+    public final DiffCluster target;
+    private final SpecificTokens specificTokens;
+    private final double reverseReadProbability;
+
+    public DiffContext(final DiffCluster source,
+                       final DiffCluster target,
+                       final String keyspace,
+                       final TableSpec table,
+                       final BigInteger startToken,
+                       final BigInteger endToken,
+                       final SpecificTokens specificTokens,
+                       final double reverseReadProbability) {
+        this.keyspace = keyspace;
+        this.table = table;
+        this.startToken = startToken;
+        this.endToken = endToken;
+        this.source = source;
+        this.target = target;
+        this.specificTokens = specificTokens;
+        this.reverseReadProbability = reverseReadProbability;
+    }
+
+    public boolean shouldReverse() {
+        return ! table.getClusteringColumns().isEmpty()
+               && reverseReadProbability > ThreadLocalRandom.current().nextDouble();
+    }
+
+    public boolean isTokenAllowed(BigInteger token) {
+        return specificTokens.test(token);
+    }
+
+    public static String cqlizedString(final String str) {
+        if (str.toLowerCase().equals(str)) {
+            return str;
+        } else {
+            return "\"" + str + "\"";
+        }
+    }
+
+    public String toString() {
+        return String.format("DiffContext: [keyspace: %s, start_token: %s, end_token: %s]",
+                             keyspace, startToken, endToken);
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
new file mode 100644
index 0000000..bb14c25
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * To run this, something like this should be executed for local runs
+ *
+ * spark-submit --files ./spark-job/localconfig.yaml
+ *              --master "local[2]"
+ *              --class org.apache.cassandra.DiffJob spark-job/target/spark-job-0.1-SNAPSHOT.jar
+ *              localconfig.yaml
+ */
+
+public class DiffJob {
+    private static final Logger logger = LoggerFactory.getLogger(DiffJob.class);
+
+    public static void main(String ... args) {
+        if (args.length == 0) {
+            System.exit(-1);
+        }
+        SparkSession spark = SparkSession.builder().appName("cassandra-diff").getOrCreate();
+        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+        String configFile = SparkFiles.get(args[0]);
+        YamlJobConfiguration configuration = YamlJobConfiguration.load(configFile);
+        DiffJob diffJob = new DiffJob();
+        diffJob.run(configuration, sc);
+        spark.stop();
+    }
+
+    public void run(JobConfiguration configuration, JavaSparkContext sc) {
+        SparkConf conf = sc.getConf();
+        // get partitioner from both clusters and verify that they match
+        ClusterProvider sourceProvider = ClusterProvider.getProvider(configuration.clusterConfig("source"), "source");
+        ClusterProvider targetProvider = ClusterProvider.getProvider(configuration.clusterConfig("target"), "target");
+        String sourcePartitioner;
+        String targetPartitioner;
+        try (Cluster sourceCluster = sourceProvider.getCluster();
+             Cluster targetCluster = targetProvider.getCluster()) {
+            sourcePartitioner = sourceCluster.getMetadata().getPartitioner();
+            targetPartitioner = targetCluster.getMetadata().getPartitioner();
+        }
+        if (!sourcePartitioner.equals(targetPartitioner)) {
+            throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,",
+                                                          sourcePartitioner, targetPartitioner));
+        }
+        TokenHelper tokenHelper = TokenHelper.forPartitioner(sourcePartitioner);
+
+        logger.info("Configuring job metadata store");
+        ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
+        JobMetadataDb.JobLifeCycle job = null;
+        UUID jobId = null;
+        try (Cluster metadataCluster = metadataProvider.getCluster();
+             Session metadataSession = metadataCluster.connect()) {
+
+            MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
+            JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions);
+
+            // Job params, which once a job is created cannot be modified in subsequent re-runs
+            logger.info("Creating or retrieving job parameters");
+            job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace);
+            Params params = getJobParams(job, configuration);
+            logger.info("Job Params: {}", params);
+            if (null == params)
+                throw new RuntimeException("Unable to initialize job params");
+
+            jobId = params.jobId;
+            List<Split> splits = getSplits(configuration, TokenHelper.forPartitioner(sourcePartitioner));
+
+            // Job options, which may be modified per-run
+            int instances = Integer.parseInt(conf.get("spark.executor.instances", "4"));
+            int cores = Integer.parseInt(conf.get("spark.executor.cores", "2"));
+            int executors = instances * cores;
+            // according to https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections we should
+            // have 2-4 partitions per cpu in the cluster:
+            int slices = Math.min(4 * executors, splits.size());
+            int perExecutorRateLimit = configuration.rateLimit() / executors;
+
+            // Record the high level job summary info
+            job.initializeJob(params,
+                              sourceProvider.getClusterName(),
+                              sourceProvider.toString(),
+                              targetProvider.getClusterName(),
+                              targetProvider.toString());
+
+            logger.info("DiffJob {} comparing [{}] in keyspace {} on {} and {}",
+                        jobId,
+                        String.join(",", params.tables),
+                        params.keyspace,
+                        sourceProvider,
+                        targetProvider);
+
+            // Run the distributed diff and collate results
+            Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
+                                                  .map((split) -> new Differ(configuration,
+                                                                             params,
+                                                                             perExecutorRateLimit,
+                                                                             split,
+                                                                             tokenHelper,
+                                                                             sourceProvider,
+                                                                             targetProvider,
+                                                                             metadataProvider,
+                                                                             new TrackerProvider(configuration.metadataOptions().keyspace))
+                                                                  .run())
+                                                  .reduce(Differ::accumulate);
+            // Publish results. This also removes the job from the currently running list
+            job.finalizeJob(params.jobId, diffStats);
+            logger.info("FINISHED: {}", diffStats);
+        } catch (Exception e) {
+            // If the job errors out, try and mark the job as not running, so it can be restarted.
+            // If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
+            // been marked not running, this will log a warning, but is not fatal.
+            if (job != null && jobId != null)
+                job.markNotRunning(jobId);
+            throw new RuntimeException("Diff job failed", e);
+        } finally {
+            if (sc.isLocal())
+                Differ.shutdown();
+        }
+    }
+
+    private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf) {
+        if (conf.jobId().isPresent()) {
+            return job.getJobParams(conf.jobId().get());
+        } else {
+            return new Params(UUID.randomUUID(),
+                              conf.keyspace(),
+                              conf.tables(),
+                              conf.buckets(),
+                              conf.splits());
+        }
+    }
+
+    private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) {
+        logger.info("Initializing splits");
+        List<Split> splits = calculateSplits(config.splits(), config.buckets(), tokenHelper);
+        logger.info("All Splits: {}", splits);
+        if (!config.specificTokens().isEmpty() && config.specificTokens().modifier == SpecificTokens.Modifier.ACCEPT) {
+            splits = getSplitsForTokens(config.specificTokens().tokens, splits);
+            logger.info("Splits for specific tokens ONLY: {}", splits);
+        }
+        // shuffle the splits to make sure the work is spread over the workers,
+        // important if it isn't a full cluster is being compared
+        Collections.shuffle(splits);
+        return splits;
+    }
+
+    @VisibleForTesting
+    static List<Split> calculateSplits(int numSplits, int numBuckets, TokenHelper tokenHelper) {
+        List<Split> splits = new ArrayList<>(numSplits);
+        BigInteger minToken = tokenHelper.min();
+        BigInteger maxToken = tokenHelper.max();
+
+        BigInteger totalTokens = maxToken.subtract(minToken);
+        BigInteger segmentSize = totalTokens.divide(BigInteger.valueOf(numSplits));
+
+        // add the first split starting at minToken without adding BigInt.ONE below
+        // Splits are grouped into buckets so we can shard the journal info across
+        // C* partitions
+        splits.add(new Split(0,  0, minToken, minToken.add(segmentSize)));
+        BigInteger prev = minToken.add(segmentSize);
+        for (int i = 1; i < numSplits - 1; i++) {
+            BigInteger next = prev.add(segmentSize);
+            // add ONE to avoid split overlap
+            splits.add(new Split(i, i % numBuckets, prev.add(BigInteger.ONE), next));
+            prev = next;
+        }
+        splits.add(new Split(numSplits - 1, (numSplits - 1) % numBuckets,  prev.add(BigInteger.ONE), maxToken)); // make sure we cover the whole range
+        return splits;
+    }
+
+    @VisibleForTesting
+    static List<Split> getSplitsForTokens(Set<BigInteger> tokens, List<Split> splits) {
+        return splits.stream().filter(split -> split.containsAny(tokens)).collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    static class Split implements Serializable {
+        final int splitNumber;
+        final int bucket;
+        final BigInteger start;
+        final BigInteger end;
+
+        Split(int splitNumber, int bucket, BigInteger start, BigInteger end) {
+            this.splitNumber = splitNumber;
+            this.bucket = bucket;
+            this.start = start;
+            this.end = end;
+        }
+
+        public String toString() {
+            return "Split [" +
+                   start +
+                   ", " +
+                   end +
+                   ']';
+        }
+
+        public boolean containsAny(Set<BigInteger> specificTokens) {
+            for (BigInteger specificToken : specificTokens) {
+                if (specificToken.compareTo(start) >= 0 && specificToken.compareTo(end) <= 0)
+                    return true;
+            }
+            return false;
+        }
+    }
+
+    static class Params implements Serializable {
+        public final UUID jobId;
+        public final String keyspace;
+        public final ImmutableList<String> tables;
+        public final int buckets;
+        public final int tasks;
+
+        Params(UUID jobId, String keyspace, List<String> tables, int buckets, int tasks) {
+            this.jobId = jobId;
+            this.keyspace = keyspace;
+            this.tables = ImmutableList.copyOf(tables);
+            this.buckets = buckets;
+            this.tasks = tasks;
+        }
+
+        public String toString() {
+            return String.format("Params: [jobId: %s, keyspace: %s, tables: %s, buckets: %s, tasks: %s]",
+                                 jobId, keyspace, tables.stream().collect(Collectors.joining(",")), buckets, tasks);
+        }
+    }
+
+    static class TaskStatus {
+        public static final TaskStatus EMPTY = new TaskStatus(null, null);
+        public final BigInteger lastToken;
+        public final RangeStats stats;
+
+        TaskStatus(BigInteger lastToken, RangeStats stats) {
+            this.lastToken = lastToken;
+            this.stats = stats;
+        }
+
+        public String toString() {
+            return "TaskStatus{" +
+                   "lastToken=" + lastToken +
+                   ", stats=" + stats +
+                   '}';
+        }
+    }
+
+    public static class TrackerProvider implements Serializable {
+        private final String metadataKeyspace;
+
+        TrackerProvider(String metadataKeyspace) {
+            this.metadataKeyspace = metadataKeyspace;
+        }
+
+        public void initializeStatements(Session session) {
+            JobMetadataDb.ProgressTracker.initializeStatements(session, metadataKeyspace);
+        }
+
+        public JobMetadataDb.ProgressTracker getTracker(Session session, UUID jobId, Split split) {
+            return new JobMetadataDb.ProgressTracker(jobId, split.bucket, split.start, split.end, metadataKeyspace, session);
+        }
+    }
+ }
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
new file mode 100644
index 0000000..49576a2
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.function.BiConsumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+public class Differ implements Serializable
+{
+    private static final Logger logger = LoggerFactory.getLogger(Differ.class);
+
+    private static final MetricRegistry metrics = new MetricRegistry();
+
+    private static final int COMPARISON_THREADS = 8;
+    private static final ComparisonExecutor COMPARISON_EXECUTOR = ComparisonExecutor.newExecutor(COMPARISON_THREADS, metrics);
+
+    private final UUID jobId;
+    private final DiffJob.Split split;
+    private final TokenHelper tokenHelper;
+    private final String keyspace;
+    private final List<String> tables;
+    private final RateLimiter rateLimiter;
+    private final DiffJob.TrackerProvider trackerProvider;
+    private final double reverseReadProbability;
+    private final SpecificTokens specificTokens;
+
+    private static DiffCluster srcDiffCluster;
+    private static DiffCluster targetDiffCluster;
+    private static Session journalSession;
+
+    static
+    {
+        Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+            StringWriter stackTrace = new StringWriter();
+            e.printStackTrace(new PrintWriter(stackTrace));
+            System.out.println("UNCAUGHT EXCEPTION: " + stackTrace.toString());
+            throw new RuntimeException(e);
+        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            logger.info("In shutdown hook");
+            shutdown();
+        }));
+    }
+
+    public Differ(JobConfiguration config,
+                  DiffJob.Params params,
+                  int perExecutorRateLimit,
+                  DiffJob.Split split,
+                  TokenHelper tokenHelper,
+                  ClusterProvider sourceProvider,
+                  ClusterProvider targetProvider,
+                  ClusterProvider metadataProvider,
+                  DiffJob.TrackerProvider trackerProvider)
+    {
+        logger.info("Creating Differ for {}", split);
+        this.jobId = params.jobId;
+        this.split = split;
+        this.tokenHelper = tokenHelper;
+        this.keyspace = params.keyspace;
+        this.tables = params.tables;
+        this.trackerProvider = trackerProvider;
+        rateLimiter = RateLimiter.create(perExecutorRateLimit);
+        this.reverseReadProbability = config.reverseReadProbability();
+        this.specificTokens = config.specificTokens();
+        synchronized (Differ.class)
+        {
+            /*
+            Spark runs jobs on each worker in the same JVM, we need to initialize these only once, otherwise
+            we run OOM with health checker threads
+             */
+            // yes we could have JobConfiguration return this directly, but snakeyaml doesn't like relocated classes and the driver has to be shaded
+            ConsistencyLevel cl = ConsistencyLevel.valueOf(config.consistencyLevel());
+            if (srcDiffCluster == null)
+            {
+                srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
+                                                 sourceProvider.getCluster(),
+                                                 params.keyspace,
+                                                 cl,
+                                                 rateLimiter,
+                                                 config.tokenScanFetchSize(),
+                                                 config.partitionReadFetchSize(),
+                                                 config.readTimeoutMillis());
+            }
+
+            if (targetDiffCluster == null)
+            {
+                targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
+                                                    targetProvider.getCluster(),
+                                                    params.keyspace,
+                                                    cl,
+                                                    rateLimiter,
+                                                    config.tokenScanFetchSize(),
+                                                    config.partitionReadFetchSize(),
+                                                    config.readTimeoutMillis());
+            }
+
+            if (journalSession == null)
+            {
+                journalSession = metadataProvider.getCluster().connect();
+                trackerProvider.initializeStatements(journalSession);
+            }
+        }
+    }
+
+    public Map<String, RangeStats> run() {
+        JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);
+        Map<String, DiffJob.TaskStatus> tablesToDiff = filterTables(tables,
+                                                                    split,
+                                                                    journal::getLastStatus,
+                                                                    !specificTokens.isEmpty());
+
+        String metricsPrefix = String.format("%s.%s", srcDiffCluster.clusterId.name(), srcDiffCluster.keyspace);
+        logger.info("Diffing {} for tables {}", split, tablesToDiff);
+
+        for (Map.Entry<String, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
+            final String table = tableStatus.getKey();
+            DiffJob.TaskStatus status = tableStatus.getValue();
+            RangeStats diffStats = status.stats;
+
+            // if this split has already been fully processed, it's being re-run to check
+            // partitions with errors. In that case, we don't want to adjust the split
+            // start and we don't want to update the completed count when we're finished.
+            boolean isRerun = split.end.equals(status.lastToken);
+            BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
+            validateRange(startToken, split.end, tokenHelper);
+
+            TableSpec sourceTable = TableSpec.make(table, srcDiffCluster);
+            TableSpec targetTable = TableSpec.make(table, targetDiffCluster);
+            validateTableSpecs(sourceTable, targetTable);
+
+            DiffContext ctx = new DiffContext(srcDiffCluster,
+                                              targetDiffCluster,
+                                              keyspace,
+                                              sourceTable,
+                                              startToken,
+                                              split.end,
+                                              specificTokens,
+                                              reverseReadProbability);
+
+            String timerName = String.format("%s.%s.split_times", metricsPrefix, table);
+            try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
+                diffStats.accumulate(diffTable(ctx,
+                                               (error, token) -> journal.recordError(table, token, error),
+                                               (type, token) -> journal.recordMismatch(table, type, token),
+                                               (stats, token) -> journal.updateStatus(table, stats, token)));
+
+                // update the journal with the final state for the table. Use the split's ending token
+                // as the last seen token (even though we may not have actually read any partition for
+                // that token) as this effectively marks the split as done.
+                journal.finishTable(table, diffStats, !isRerun);
+            }
+        }
+
+        Map<String, RangeStats> statsByTable = tablesToDiff.entrySet()
+                                                           .stream()
+                                                           .collect(Collectors.toMap(Map.Entry::getKey,
+                                                                                     e -> e.getValue().stats));
+        updateMetrics(metricsPrefix, statsByTable);
+        return statsByTable;
+    }
+
+    public RangeStats diffTable(final DiffContext context,
+                                final BiConsumer<Throwable, BigInteger> partitionErrorReporter,
+                                final BiConsumer<MismatchType, BigInteger> mismatchReporter,
+                                final BiConsumer<RangeStats, BigInteger> journal) {
+
+        final Iterator<PartitionKey> sourceKeys = context.source.getPartitionKeys(context.table.getTable(),
+                                                                                  context.startToken,
+                                                                                  context.endToken);
+        final Iterator<PartitionKey> targetKeys = context.target.getPartitionKeys(context.table.getTable(),
+                                                                                  context.startToken,
+                                                                                  context.endToken);
+        final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
+            (key) -> {
+                boolean reverse = context.shouldReverse();
+                return new PartitionComparator(context.table,
+                                               context.source.getPartition(context.table, key, reverse),
+                                               context.target.getPartition(context.table, key, reverse));
+            };
+
+        RangeComparator rangeComparator = new RangeComparator(context,
+                                                              partitionErrorReporter,
+                                                              mismatchReporter,
+                                                              journal,
+                                                              COMPARISON_EXECUTOR);
+
+        final RangeStats tableStats = rangeComparator.compare(sourceKeys, targetKeys, partitionTaskProvider);
+        logger.debug("Table [{}] stats - ({})", context.table.getTable(), tableStats);
+        return tableStats;
+    }
+
+    @VisibleForTesting
+    static Map<String, DiffJob.TaskStatus> filterTables(Iterable<String> tables,
+                                                        DiffJob.Split split,
+                                                        Function<String, DiffJob.TaskStatus> journal,
+                                                        boolean includeCompleted) {
+        Map<String, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
+        for (String table : tables) {
+            DiffJob.TaskStatus taskStatus = journal.apply(table);
+            RangeStats diffStats = taskStatus.stats;
+            BigInteger lastToken = taskStatus.lastToken;
+
+            // When we finish processing a split for a given table, we update the task status in journal
+            // to set the last seen token to the split's end token, to indicate that the split is complete.
+            if (!includeCompleted && lastToken != null && lastToken.equals(split.end)) {
+                logger.info("Found finished table {} for split {}", table, split);
+            }
+            else {
+                tablesToProcess.put(table, diffStats != null
+                                            ? taskStatus
+                                            : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
+            }
+        }
+        return tablesToProcess;
+    }
+
+    static void validateTableSpecs(TableSpec source, TableSpec target) {
+        Verify.verify(source.equalsNamesOnly(target),
+                      "Source and target table definitions do not match (Source: %s Target: %s)",
+                      source, target);
+    }
+
+    @VisibleForTesting
+    static void validateRange(BigInteger start, BigInteger end, TokenHelper tokens) {
+
+        Verify.verify(start != null && end != null, "Invalid token range [%s,%s]", start, end);
+
+        Verify.verify(start.compareTo(tokens.min()) >= 0 && end.compareTo(tokens.max()) <= 0 && start.compareTo(end) < 0,
+                      "Invalid token range [%s,%s] for partitioner range [%s,%s]",
+                       start, end, tokens.min(), tokens.max());
+    }
+
+    @VisibleForTesting
+    static Map<String, RangeStats> accumulate(Map<String, RangeStats> stats, Map<String, RangeStats> otherStats)
+    {
+        for (Map.Entry<String, RangeStats> otherEntry : otherStats.entrySet())
+        {
+            if (stats.containsKey(otherEntry.getKey()))
+                stats.get(otherEntry.getKey()).accumulate(otherEntry.getValue());
+            else
+                stats.put(otherEntry.getKey(), otherEntry.getValue());
+        }
+        return stats;
+    }
+
+    private static void updateMetrics(String prefix, Map<String, RangeStats> statsMap)
+    {
+        for (Map.Entry<String, RangeStats> entry : statsMap.entrySet())
+        {
+            String qualifier = String.format("%s.%s", prefix, entry.getKey());
+            RangeStats stats = entry.getValue();
+
+            metrics.meter(qualifier + ".partitions_read").mark(stats.getMatchedPartitions() + stats.getOnlyInSource() + stats.getOnlyInTarget() + stats.getMismatchedPartitions());
+            metrics.counter(qualifier + ".matched_partitions").inc(stats.getMatchedPartitions());
+            metrics.counter(qualifier + ".mismatched_partitions").inc(stats.getMismatchedPartitions());
+
+            metrics.counter(qualifier + ".partitions_only_in_source").inc(stats.getOnlyInSource());
+            metrics.counter(qualifier + ".partitions_only_in_target").inc(stats.getOnlyInTarget());
+            metrics.counter(qualifier + ".skipped_partitions").inc(stats.getSkippedPartitions());
+
+            metrics.counter(qualifier + ".matched_rows").inc(stats.getMatchedRows());
+            metrics.counter(qualifier + ".matched_values").inc(stats.getMatchedValues());
+            metrics.counter(qualifier + ".mismatched_values").inc(stats.getMismatchedValues());
+        }
+    }
+
+    public static void shutdown()
+    {
+        try
+        {
+            if (srcDiffCluster != null) {
+                srcDiffCluster.stop();
+                srcDiffCluster.close();
+            }
+            if (targetDiffCluster != null) {
+                targetDiffCluster.stop();
+                targetDiffCluster.close();
+            }
+            if (journalSession != null) {
+                journalSession.close();
+                journalSession.getCluster().close();
+            }
+            COMPARISON_EXECUTOR.shutdown();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
new file mode 100644
index 0000000..0ac6521
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.utils.UUIDs;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+public class JobMetadataDb {
+    private static final Logger logger = LoggerFactory.getLogger(JobMetadataDb.class);
+
+    static class ProgressTracker {
+
+        private final UUID jobId;
+        private final int bucket;
+        private final String startToken;
+        private final String endToken;
+        private final String keyspace;
+        private Session session;
+
+        private static PreparedStatement updateStmt;
+        private static PreparedStatement mismatchStmt;
+        private static PreparedStatement errorSummaryStmt;
+        private static PreparedStatement errorDetailStmt;
+        private static PreparedStatement updateCompleteStmt;
+
+        public ProgressTracker(UUID jobId,
+                               int bucket,
+                               BigInteger startToken,
+                               BigInteger endToken,
+                               String keyspace,
+                               Session session) {
+            this.jobId = jobId;
+            this.bucket = bucket;
+            this.startToken = startToken.toString();
+            this.endToken = endToken.toString();
+            this.keyspace = keyspace;
+            this.session = session;
+        }
+
+        /**
+         * Runs on each executor to prepare statements shared across all instances
+         */
+        public static void initializeStatements(Session session, String keyspace) {
+            if (updateStmt == null) {
+                updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                           " job_id," +
+                                                           " bucket," +
+                                                           " table_name," +
+                                                           " start_token," +
+                                                           " end_token," +
+                                                           " matched_partitions," +
+                                                           " mismatched_partitions," +
+                                                           " partitions_only_in_source," +
+                                                           " partitions_only_in_target," +
+                                                           " matched_rows," +
+                                                           " matched_values," +
+                                                           " mismatched_values," +
+                                                           " skipped_partitions," +
+                                                           " last_token )" +
+                                                           "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+                                                           keyspace, Schema.TASK_STATUS));
+            }
+            if (mismatchStmt == null) {
+                mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                             " job_id," +
+                                                             " bucket," +
+                                                             " table_name," +
+                                                             " mismatching_token," +
+                                                             " mismatch_type )" +
+                                                             "VALUES (?, ?, ?, ?, ?)",
+                                                             keyspace, Schema.MISMATCHES));
+            }
+            if (updateCompleteStmt == null) {
+                updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
+                                                                   " SET completed = completed + 1" +
+                                                                   " WHERE job_id = ? " +
+                                                                   " AND bucket = ? " +
+                                                                   " AND table_name = ? ",
+                                                                   keyspace, Schema.JOB_STATUS))
+                                            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+            }
+            if (errorSummaryStmt == null) {
+                errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                                 " job_id," +
+                                                                 " bucket," +
+                                                                 " table_name," +
+                                                                 " start_token," +
+                                                                 " end_token)" +
+                                                                 " VALUES (?, ?, ?, ?, ?)",
+                                                                 keyspace, Schema.ERROR_SUMMARY));
+            }
+            if (errorDetailStmt == null) {
+                errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                                " job_id," +
+                                                                " bucket," +
+                                                                " table_name," +
+                                                                " start_token," +
+                                                                " end_token," +
+                                                                " error_token)" +
+                                                                " VALUES (?, ?, ?, ?, ?, ?)",
+                                                                keyspace, Schema.ERROR_DETAIL));
+            }
+
+        }
+
+        /**
+         *
+         * @param table
+         * @return
+         */
+        public DiffJob.TaskStatus getLastStatus(String table) {
+            ResultSet rs = session.execute(String.format("SELECT last_token, " +
+                                                         "       matched_partitions, " +
+                                                         "       mismatched_partitions, " +
+                                                         "       partitions_only_in_source, " +
+                                                         "       partitions_only_in_target, " +
+                                                         "       matched_rows," +
+                                                         "       matched_values," +
+                                                         "       mismatched_values," +
+                                                         "       skipped_partitions " +
+                                                         " FROM %s.%s " +
+                                                         " WHERE job_id = ? " +
+                                                         " AND   bucket = ? " +
+                                                         " AND   table_name = ? " +
+                                                         " AND   start_token = ? " +
+                                                         " AND   end_token = ?",
+                                                         keyspace, Schema.TASK_STATUS),
+                                           jobId, bucket, table, startToken, endToken);
+            Row row = rs.one();
+            if (null == row)
+                return DiffJob.TaskStatus.EMPTY;
+
+            RangeStats stats = RangeStats.withValues(getOrDefaultLong(row, "matched_partitions"),
+                                                     getOrDefaultLong(row, "mismatched_partitions"),
+                                                     0L, // error counts are per-run and not persisted in the metadata db
+                                                     getOrDefaultLong(row, "skipped_partitions"),
+                                                     getOrDefaultLong(row, "partitions_only_in_source"),
+                                                     getOrDefaultLong(row, "partitions_only_in_target"),
+                                                     getOrDefaultLong(row, "matched_rows"),
+                                                     getOrDefaultLong(row, "matched_values"),
+                                                     getOrDefaultLong(row, "mismatched_values"));
+
+            BigInteger lastToken = row.isNull("last_token") ? null : new BigInteger(row.getString("last_token"));
+            return new DiffJob.TaskStatus(lastToken, stats);
+        }
+
+        /**
+         *
+         * @param table
+         * @param diffStats
+         * @param latestToken
+         */
+        public void updateStatus(String table, RangeStats diffStats, BigInteger latestToken) {
+            session.execute(bindUpdateStatement(table, diffStats, latestToken));
+        }
+
+        public void recordMismatch(String table, MismatchType type, BigInteger token) {
+            logger.info("Detected mismatch in table {}; partition with token {} is {}",
+                        table, token, type == MismatchType.PARTITION_MISMATCH
+                                      ? " different in source and target clusters"
+                                      : type == MismatchType.ONLY_IN_SOURCE ? "only present in source cluster"
+                                                                            : "only present in target cluster");
+            session.execute(bindMismatchesStatement(table, token, type.name()));
+        }
+
+        /**
+         *
+         * @param table
+         * @param token
+         * @param error
+         */
+        public void recordError(String table, BigInteger token, Throwable error) {
+            logger.error(String.format("Encountered error during partition comparison in table %s; " +
+                                       "error for partition with token %s", table, token), error);
+            BatchStatement batch = new BatchStatement();
+            batch.add(bindErrorSummaryStatement(table));
+            batch.add(bindErrorDetailStatement(table, token));
+            batch.setIdempotent(true);
+            session.execute(batch);
+        }
+
+        /**
+         *
+         * @param table
+         * @param stats
+         */
+        public void finishTable(String table, RangeStats stats, boolean updateCompletedCount) {
+            logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table);
+            // first flush out the last status.
+            session.execute(bindUpdateStatement(table, stats, endToken));
+            // then update the count of completed tasks
+            if (updateCompletedCount)
+                session.execute(updateCompleteStmt.bind(jobId, bucket, table));
+        }
+
+        private Statement bindMismatchesStatement(String table, BigInteger token, String type) {
+            return mismatchStmt.bind(jobId, bucket, table, token.toString(), type)
+                               .setIdempotent(true);
+        }
+
+        private Statement bindErrorSummaryStatement(String table) {
+            return errorSummaryStmt.bind(jobId, bucket, table, startToken, endToken)
+                                   .setIdempotent(true);
+        }
+
+        private Statement bindErrorDetailStatement(String table, BigInteger errorToken) {
+            return errorDetailStmt.bind(jobId, bucket, table, startToken, endToken, errorToken.toString())
+                                  .setIdempotent(true);
+        }
+
+        private Statement bindUpdateStatement(String table, RangeStats stats, BigInteger token) {
+           return bindUpdateStatement(table, stats, token.toString());
+        }
+
+        private Statement bindUpdateStatement(String table, RangeStats stats, String token) {
+            // We don't persist the partition error count from RangeStats as errors
+            // are likely to be transient and not data related, so we don't want to
+            // accumulate them across runs.
+            return updateStmt.bind(jobId,
+                                   bucket,
+                                   table,
+                                   startToken,
+                                   endToken,
+                                   stats.getMatchedPartitions(),
+                                   stats.getMismatchedPartitions(),
+                                   stats.getOnlyInSource(),
+                                   stats.getOnlyInTarget(),
+                                   stats.getMatchedRows(),
+                                   stats.getMatchedValues(),
+                                   stats.getMismatchedValues(),
+                                   stats.getSkippedPartitions(),
+                                   token)
+                             .setIdempotent(true);
+        }
+
+        private static long getOrDefaultLong(Row row, String column) {
+            return (null == row || row.isNull(column)) ? 0L : row.getLong(column);
+        }
+    }
+
+    static class JobLifeCycle {
+        final Session session;
+        final String keyspace;
+
+        public JobLifeCycle(Session session, String metadataKeyspace) {
+            this.session = session;
+            this.keyspace = metadataKeyspace;
+        }
+
+        public DiffJob.Params getJobParams(UUID jobId) {
+            ResultSet rs = session.execute(String.format("SELECT keyspace_name, " +
+                                                         "       table_names," +
+                                                         "       buckets," +
+                                                         "       total_tasks " +
+                                                         "FROM %s.%s " +
+                                                         "WHERE job_id = ?",
+                                                         keyspace, Schema.JOB_SUMMARY),
+                                           jobId);
+            Row row = rs.one();
+            if (null == row)
+                return null;
+
+            return new DiffJob.Params(jobId,
+                                      row.getString("keyspace_name"),
+                                      row.getList("table_names", String.class),
+                                      row.getInt("buckets"),
+                                      row.getInt("total_tasks"));
+        }
+
+
+        // Runs on Driver to insert top level job info
+        public void initializeJob(DiffJob.Params params,
+                                  String sourceClusterName,
+                                  String sourceClusterDesc,
+                                  String targetClusterName,
+                                  String targetClusterDesc) {
+
+            logger.info("Initializing job status");
+            // The job was previously run, so this could be a re-run to
+            // mop up any failed splits so mark it in progress.
+            ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
+                                                         keyspace, Schema.RUNNING_JOBS),
+                                           params.jobId);
+            if (!rs.one().getBool("[applied]")) {
+                logger.info("Aborting due to inability to mark job as running. " +
+                            "Did a previous run of job id {} fail non-gracefully?",
+                            params.jobId);
+                throw new RuntimeException("Unable to mark job running, aborting");
+            }
+
+            UUID timeUUID = UUIDs.timeBased();
+            DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC);
+
+            rs = session.execute(String.format("INSERT INTO %s.%s (" +
+                                               " job_id," +
+                                               " job_start_time," +
+                                               " buckets," +
+                                               " keyspace_name," +
+                                               " table_names," +
+                                               " source_cluster_name," +
+                                               " source_cluster_desc," +
+                                               " target_cluster_name," +
+                                               " target_cluster_desc," +
+                                               " total_tasks)" +
+                                               " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +
+                                               " IF NOT EXISTS",
+                                               keyspace, Schema.JOB_SUMMARY),
+                                 params.jobId,
+                                 timeUUID,
+                                 params.buckets,
+                                 params.keyspace,
+                                 params.tables,
+                                 sourceClusterName,
+                                 sourceClusterDesc,
+                                 targetClusterName,
+                                 targetClusterDesc,
+                                 params.tasks);
+
+            // This is a brand new job, index its details including start time
+            if (rs.one().getBool("[applied]")) {
+                BatchStatement batch = new BatchStatement();
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
+                                                            keyspace, Schema.SOURCE_CLUSTER_INDEX),
+                                              sourceClusterName, params.jobId));
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
+                                                            keyspace, Schema.TARGET_CLUSTER_INDEX),
+                                              targetClusterName, params.jobId));
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
+                                                            keyspace, Schema.KEYSPACE_INDEX),
+                                              keyspace, params.jobId));
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
+                                                            "VALUES ('%s', ?, ?, ?)",
+                                                            keyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
+                                              startDateTime.getHourOfDay(), timeUUID, params.jobId));
+                session.execute(batch);
+            }
+        }
+
+        public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {
+            logger.info("Finalizing job status");
+
+            markNotRunning(jobId);
+
+            BatchStatement batch = new BatchStatement();
+            for (Map.Entry<String, RangeStats> result : results.entrySet()) {
+                String table = result.getKey();
+                RangeStats stats = result.getValue();
+                session.execute(String.format("INSERT INTO %s.%s (" +
+                                              "  job_id," +
+                                              "  table_name," +
+                                              "  matched_partitions," +
+                                              "  mismatched_partitions," +
+                                              "  partitions_only_in_source," +
+                                              "  partitions_only_in_target," +
+                                              "  matched_rows," +
+                                              "  matched_values," +
+                                              "  mismatched_values," +
+                                              "  skipped_partitions) " +
+                                              "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+                                              keyspace, Schema.JOB_RESULTS),
+                                jobId,
+                                table,
+                                stats.getMatchedPartitions(),
+                                stats.getMismatchedPartitions(),
+                                stats.getOnlyInSource(),
+                                stats.getOnlyInTarget(),
+                                stats.getMatchedRows(),
+                                stats.getMatchedValues(),
+                                stats.getMismatchedValues(),
+                                stats.getSkippedPartitions());
+            }
+            session.execute(batch);
+        }
+
+
+        public void markNotRunning(UUID jobId) {
+            logger.info("Marking job {} as not running", jobId);
+
+            ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
+                                                         keyspace, Schema.RUNNING_JOBS),
+                                           jobId);
+            if (!rs.one().getBool("[applied]")) {
+                logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
+                            "during initialization as there may be no entry for this job in the {} table",
+                            jobId, Schema.RUNNING_JOBS);
+            }
+        }
+    }
+
+    static class Schema {
+
+        public static final String TASK_STATUS = "task_status";
+        private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                         " job_id uuid," +
+                                                         " bucket int," +
+                                                         " table_name text," +
+                                                         " start_token varchar," +
+                                                         " end_token varchar," +
+                                                         " matched_partitions bigint," +
+                                                         " mismatched_partitions bigint, " +
+                                                         " partitions_only_in_source bigint," +
+                                                         " partitions_only_in_target bigint," +
+                                                         " matched_rows bigint," +
+                                                         " matched_values bigint," +
+                                                         " mismatched_values bigint," +
+                                                         " skipped_partitions bigint," +
+                                                         " last_token varchar," +
+                                                         " PRIMARY KEY((job_id, bucket), table_name, start_token, end_token))" +
+                                                         " WITH default_time_to_live = %s";
+
+        public static final String JOB_SUMMARY = "job_summary";
+        private static final String JOB_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                         " job_id uuid," +
+                                                         " job_start_time timeuuid," +
+                                                         " buckets int," +
+                                                         " keyspace_name text," +
+                                                         " table_names frozen<list<text>>," +
+                                                         " source_cluster_name text," +
+                                                         " source_cluster_desc text," +
+                                                         " target_cluster_name text," +
+                                                         " target_cluster_desc text," +
+                                                         " total_tasks int," +
+                                                         " PRIMARY KEY(job_id))" +
+                                                         " WITH default_time_to_live = %s";
+
+        public static final String JOB_RESULTS = "job_results";
+        private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                         " job_id uuid," +
+                                                         " table_name text," +
+                                                         " matched_partitions bigint," +
+                                                         " mismatched_partitions bigint," +
+                                                         " partitions_only_in_source bigint," +
+                                                         " partitions_only_in_target bigint," +
+                                                         " matched_rows bigint," +
+                                                         " matched_values bigint," +
+                                                         " mismatched_values bigint," +
+                                                         " skipped_partitions bigint," +
+                                                         " PRIMARY KEY(job_id, table_name))" +
+                                                         " WITH default_time_to_live = %s";
+
+        public static final String JOB_STATUS = "job_status";
+        private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                        " job_id uuid," +
+                                                        " bucket int," +
+                                                        " table_name text," +
+                                                        " completed counter," +
+                                                        " PRIMARY KEY ((job_id, bucket), table_name))";
+
+        public static final String MISMATCHES = "mismatches";
+        private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                        " job_id uuid," +
+                                                        " bucket int," +
+                                                        " table_name text, " +
+                                                        " mismatching_token varchar, " +
+                                                        " mismatch_type text, " +
+                                                        " PRIMARY KEY ((job_id, bucket), table_name, mismatching_token))" +
+                                                        " WITH default_time_to_live = %s";
+
+        public static final String ERROR_SUMMARY = "task_errors";
+        private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                           " job_id uuid," +
+                                                           " bucket int," +
+                                                           " table_name text," +
+                                                           " start_token varchar," +
+                                                           " end_token varchar," +
+                                                           " PRIMARY KEY ((job_id, bucket), table_name, start_token, end_token))" +
+                                                           " WITH default_time_to_live = %s";
+
+        public static final String ERROR_DETAIL = "partition_errors";
+        private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                          " job_id uuid," +
+                                                          " bucket int," +
+                                                          " table_name text," +
+                                                          " start_token varchar," +
+                                                          " end_token varchar," +
+                                                          " error_token varchar," +
+                                                          " PRIMARY KEY ((job_id, bucket, table_name, start_token, end_token), error_token))" +
+                                                          " WITH default_time_to_live = %s";
+
+        public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index";
+        private static final String SOURCE_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                                  " source_cluster_name text," +
+                                                                  " job_id uuid," +
+                                                                  " PRIMARY KEY (source_cluster_name, job_id))" +
+                                                                  " WITH default_time_to_live = %s";
+
+        public static final String TARGET_CLUSTER_INDEX = "target_cluster_index";
+        private static final String TARGET_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                                  " target_cluster_name text," +
+                                                                  " job_id uuid," +
+                                                                  " PRIMARY KEY (target_cluster_name, job_id))" +
+                                                                  " WITH default_time_to_live = %s";
+
+        public static final String KEYSPACE_INDEX = "keyspace_index";
+        private static final String KEYSPACE_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                            " keyspace_name text," +
+                                                            " job_id uuid," +
+                                                            " PRIMARY KEY(keyspace_name, job_id))" +
+                                                            " WITH default_time_to_live = %s";
+
+        public static final String JOB_START_INDEX = "job_start_index";
+        private static final String JOB_START_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                             " job_start_date date," +
+                                                             " job_start_hour int," +
+                                                             " job_start_time timeuuid," +
+                                                             " job_id uuid," +
+                                                             " PRIMARY KEY ((job_start_date, job_start_hour), job_start_time))" +
+                                                             " WITH default_time_to_live = %s";
+
+        public static final String RUNNING_JOBS = "running_jobs";
+        private static final String RUNNING_JOBS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                          " job_id uuid," +
+                                                          " PRIMARY KEY (job_id))" +
+                                                          " WITH default_time_to_live = %s";
+
+        private static final String KEYSPACE_SCHEMA = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s";
+
+
+        public static void maybeInitialize(Session session, MetadataKeyspaceOptions options) {
+            if (!options.should_init)
+                return;
+
+            logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace);
+            session.execute(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication));
+            session.execute(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl));
+            session.execute(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS));
+            session.execute(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl));
+            session.execute(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl));
+            session.execute(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl));
+            session.execute(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl));
+            session.execute(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl));
+            session.execute(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl));
+            session.execute(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl));
+            session.execute(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl));
+            session.execute(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl));
+            session.execute(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl));
+            logger.info("Schema initialized");
+        }
+    }
+}
+
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java b/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java
new file mode 100644
index 0000000..b23ac35
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+public enum MismatchType {
+
+    ONLY_IN_SOURCE,
+    ONLY_IN_TARGET,
+    PARTITION_MISMATCH
+
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
new file mode 100644
index 0000000..6434dc8
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.Iterator;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+
+public class PartitionComparator implements Callable<PartitionStats> {
+
+    private static final Logger logger = LoggerFactory.getLogger(PartitionComparator.class);
+
+    private final TableSpec tableSpec;
+    private final Iterator<Row> source;
+    private final Iterator<Row> target;
+
+    public PartitionComparator(TableSpec tableSpec,
+                               Iterator<Row> source,
+                               Iterator<Row> target) {
+        this.tableSpec = tableSpec;
+        this.source = source;
+        this.target = target;
+    }
+
+    public PartitionStats call() {
+        PartitionStats partitionStats = new PartitionStats();
+
+        if (source == null || target == null) {
+            logger.error("Skipping partition because one result was null (timeout despite retries)");
+            partitionStats.skipped = true;
+            return partitionStats;
+        }
+
+        while (source.hasNext() && target.hasNext()) {
+
+            Row sourceRow = source.next();
+            Row targetRow = target.next();
+
+            // if primary keys don't match don't proceed any further, just mark the
+            // partition as mismatched and be done
+            if (!clusteringsEqual(sourceRow, targetRow)) {
+                partitionStats.allClusteringsMatch = false;
+                return partitionStats;
+            }
+
+            partitionStats.matchedRows++;
+
+            // if the rows match, but there are mismatching values in the regular columns
+            // we can continue processing the partition, so just flag it as mismatched and continue
+            checkRegularColumnEquality(partitionStats, sourceRow, targetRow);
+        }
+
+        // if one of the iterators isn't exhausted, then there's a mismatch at the partition level
+        if (source.hasNext() || target.hasNext())
+            partitionStats.allClusteringsMatch = false;
+
+        return partitionStats;
+    }
+
+    private boolean clusteringsEqual(Row source, Row target) {
+        for (ColumnMetadata column : tableSpec.getClusteringColumns()) {
+            Object fromSource = source.getObject(column.getName());
+            Object fromTarget = target.getObject(column.getName());
+
+            if ((fromSource == null) != (fromTarget == null))
+                return false;
+
+            if (fromSource != null && !fromSource.equals(fromTarget))
+                return false;
+        }
+        return true;
+    }
+
+    private void checkRegularColumnEquality(PartitionStats stats, Row source, Row target) {
+        for (ColumnMetadata column : tableSpec.getRegularColumns()) {
+            Object fromSource = source.getObject(column.getName());
+            Object fromTarget = target.getObject(column.getName());
+            if (fromSource == null) {
+                if (fromTarget == null) {
+                    stats.matchedValues++;
+                } else {
+                    stats.mismatchedValues++;
+                }
+            } else {
+                if (fromSource.equals(fromTarget)) {
+                    stats.matchedValues++;
+                } else {
+                    stats.mismatchedValues++;
+                }
+            }
+        }
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
new file mode 100644
index 0000000..d31da4f
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datastax.driver.core.*;
+import org.jetbrains.annotations.NotNull;
+
+public class PartitionKey implements Comparable<PartitionKey> {
+
+    private final Row row;
+
+    public PartitionKey(Row row) {
+        this.row = row;
+    }
+
+    public BigInteger getTokenAsBigInteger(){
+        Token token = getToken();
+        if (token.getType() == DataType.bigint()) {
+            return BigInteger.valueOf((Long) token.getValue());
+        } else {
+            return (BigInteger) token.getValue();
+        }
+    }
+
+    public List<Object> getComponents() {
+        int cols = row.getColumnDefinitions().size();
+        List<Object> columns = new ArrayList<>(cols);
+        // Note we start at index=1, because index=0 is the token
+        for (int i = 1; i < cols; i++)
+            columns.add(row.getObject(i));
+        return columns;
+    }
+
+    @VisibleForTesting
+    protected Token getToken() {
+        return row.getToken(0);
+    }
+
+    public int compareTo(@NotNull PartitionKey o) {
+        return getToken().compareTo(o.getToken());
+    }
+
+    public boolean equals(Object obj) {
+        return this == obj || (obj instanceof PartitionKey &&  this.compareTo((PartitionKey)obj) == 0);
+    }
+
+    public int hashCode() {
+        return Objects.hash(getTokenAsBigInteger());
+    }
+
+    public String toString() {
+        return StreamSupport.stream(row.getColumnDefinitions().spliterator(), false)
+                            .map(ColumnDefinitions.Definition::getName)
+                            .map(row::getObject)
+                            .filter(Objects::nonNull)
+                            .map(Object::toString)
+                            .collect(Collectors.joining(":"));
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java
new file mode 100644
index 0000000..7f020ff
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+public class PartitionStats {
+    public boolean skipped = false;
+    public boolean allClusteringsMatch = true;
+    public long matchedRows;
+    public long matchedValues;
+    public long mismatchedValues;
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
new file mode 100644
index 0000000..36ce2b5
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.*;
+
+import com.google.common.base.Verify;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RangeComparator {
+
+    private static final Logger logger = LoggerFactory.getLogger(RangeComparator.class);
+
+    private final DiffContext context;
+    private final BiConsumer<Throwable, BigInteger> errorReporter;
+    private final BiConsumer<MismatchType, BigInteger> mismatchReporter;
+    private final BiConsumer<RangeStats, BigInteger> journal;
+    private final ComparisonExecutor comparisonExecutor;
+
+    public RangeComparator(DiffContext context,
+                           BiConsumer<Throwable, BigInteger> errorReporter,
+                           BiConsumer<MismatchType,BigInteger> mismatchReporter,
+                           BiConsumer<RangeStats, BigInteger> journal,
+                           ComparisonExecutor comparisonExecutor) {
+        this.context = context;
+        this.errorReporter = errorReporter;
+        this.mismatchReporter = mismatchReporter;
+        this.journal = journal;
+        this.comparisonExecutor = comparisonExecutor;
+    }
+
+    public RangeStats compare(Iterator<PartitionKey> sourceKeys,
+                              Iterator<PartitionKey> targetKeys,
+                              Function<PartitionKey, PartitionComparator> partitionTaskProvider) {
+
+        final RangeStats rangeStats = RangeStats.newStats();
+        // We can catch this condition earlier, but it doesn't hurt to also check here
+        if (context.startToken.equals(context.endToken))
+            return rangeStats;
+
+        Phaser phaser = new Phaser(1);
+        AtomicLong partitionCount = new AtomicLong(0);
+        AtomicReference<BigInteger> highestTokenSeen = new AtomicReference<>(context.startToken);
+
+        logger.info("Comparing range [{},{}]", context.startToken, context.endToken);
+        try {
+            PartitionKey sourceKey = nextKey(sourceKeys);
+            PartitionKey targetKey = nextKey(targetKeys);
+
+            // special case for start of range - handles one cluster supplying an empty range
+            if ((sourceKey == null) != (targetKey == null)) {
+                if (sourceKey == null) {
+                    logger.info("First in range, source iter is empty {}", context);
+                    onlyInTarget(rangeStats, targetKey);
+                    targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
+                } else {
+                    logger.info("First in range, target iter is empty {}", context);
+                    onlyInSource(rangeStats, sourceKey);
+                    sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
+                }
+                return rangeStats;
+            }
+
+            while (sourceKey != null && targetKey != null) {
+
+                int ret = sourceKey.compareTo(targetKey);
+                if (ret > 0) {
+                    onlyInTarget(rangeStats, targetKey);
+                    targetKey = nextKey(targetKeys);
+                } else if (ret < 0) {
+                    onlyInSource(rangeStats, sourceKey);
+                    sourceKey = nextKey(sourceKeys);
+                } else {
+
+                    Verify.verify(sourceKey.equals(targetKey),
+                                  "Can only compare partitions with identical keys: (%s, %s)",
+                                  sourceKey, targetKey);
+
+                    // For results where the key exists in both, we'll fire off an async task to walk the
+                    // partition and compare all the rows. The result of that comparison is added to the
+                    // totals for the range and the highest seen token updated in the onSuccess callback
+
+                    if (!context.isTokenAllowed(sourceKey.getTokenAsBigInteger())) {
+                        logger.debug("Skipping disallowed token {}", sourceKey.getTokenAsBigInteger());
+                        rangeStats.skipPartition();
+                        sourceKey = nextKey(sourceKeys);
+                        targetKey = nextKey(targetKeys);
+                        continue;
+                    }
+
+                    BigInteger token = sourceKey.getTokenAsBigInteger();
+                    try {
+                        PartitionComparator comparisonTask = partitionTaskProvider.apply(sourceKey);
+                        comparisonExecutor.submit(comparisonTask,
+                                                  onSuccess(rangeStats, partitionCount, token, highestTokenSeen, mismatchReporter, journal),
+                                                  onError(rangeStats, token, errorReporter),
+                                                  phaser);
+                    } catch (Throwable t) {
+                        // Handle errors thrown when creating the comparison task. This should trap timeouts and
+                        // unavailables occurring when performing the initial query to read the full partition.
+                        // Errors thrown when paging through the partition in comparisonTask will be handled by
+                        // the onError callback.
+                        rangeStats.partitionError();
+                        errorReporter.accept(t, token);
+                    } finally {
+                        // if the cluster has been shutdown because the task failed the underlying iterators
+                        // of partition keys will return hasNext == false
+                        sourceKey = nextKey(sourceKeys);
+                        targetKey = nextKey(targetKeys);
+                    }
+                }
+            }
+
+            // handle case where only one iterator is exhausted
+            if (sourceKey != null)
+                onlyInSource(rangeStats, sourceKey);
+            else if (targetKey != null)
+                onlyInTarget(rangeStats, targetKey);
+
+            drain(sourceKeys, targetKeys, rangeStats);
+
+        } catch (Exception e) {
+            // Handles errors thrown by iteration of underlying resultsets of partition keys by
+            // calls to nextKey(). Such errors should cause the overall range comparison to fail,
+            // but we must ensure that any in-flight partition comparisons complete so that either
+            // the onSuccess or onError callback is fired for each one. This is necessary to ensure
+            // that we record the highest seen token and any failed partitions and can safely re-run.
+            logger.debug("Waiting for {} in flight tasks before propagating error", phaser.getUnarrivedParties());
+            phaser.arriveAndAwaitAdvance();
+            throw new RuntimeException(String.format("Error encountered during range comparison for [%s:%s]",
+                                       context.startToken, context.endToken), e);
+        }
+
+        logger.debug("Waiting for {} in flight tasks before returning", phaser.getUnarrivedParties());
+        phaser.arriveAndAwaitAdvance();
+
+        if (!rangeStats.allMatches())
+            logger.info("Segment [{}:{}] stats - ({})", context.startToken, context.endToken, rangeStats);
+
+        return rangeStats;
+    }
+
+    private void drain(Iterator<PartitionKey> sourceKeys,
+                             Iterator<PartitionKey> targetKeys,
+                             RangeStats rangeStats) {
+        if (sourceKeys.hasNext()) {
+            logger.info("Source keys not exhausted {}", context);
+            sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
+        } else if (targetKeys.hasNext()) {
+            logger.info("Target keys not exhausted: {}", context);
+            targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
+        }
+    }
+
+    private void onlyInTarget(RangeStats stats, PartitionKey key) {
+        stats.onlyInTarget();
+        mismatchReporter.accept(MismatchType.ONLY_IN_TARGET, key.getTokenAsBigInteger());
+    }
+
+    private void onlyInSource(RangeStats stats, PartitionKey key) {
+        stats.onlyInSource();
+        mismatchReporter.accept(MismatchType.ONLY_IN_SOURCE, key.getTokenAsBigInteger());
+    }
+
+    private PartitionKey nextKey(Iterator<PartitionKey> keys) {
+        return keys.hasNext() ? keys.next() : null;
+    }
+
+    private Consumer<PartitionStats> onSuccess(final RangeStats rangeStats,
+                                               final AtomicLong partitionCount,
+                                               final BigInteger currentToken,
+                                               final AtomicReference<BigInteger> highestSeenToken,
+                                               final BiConsumer<MismatchType, BigInteger> mismatchReporter,
+                                               final BiConsumer<RangeStats, BigInteger> journal) {
+        return (result) -> {
+
+            rangeStats.accumulate(result);
+            if (!result.allClusteringsMatch || result.mismatchedValues > 0) {
+                mismatchReporter.accept(MismatchType.PARTITION_MISMATCH, currentToken);
+                rangeStats.mismatchedPartition();
+            } else {
+                rangeStats.matchedPartition();
+            }
+
+            BigInteger highest = highestSeenToken.get();
+            while (currentToken.compareTo(highest) > 0) {
+                if (highestSeenToken.compareAndSet(highest, currentToken))
+                    break;
+
+                highest = highestSeenToken.get();
+            }
+
+            // checkpoint ever 10 partitions
+            if (partitionCount.incrementAndGet() % 10 == 0)
+                journal.accept(rangeStats, highestSeenToken.get());
+        };
+    }
+
+    private Consumer<Throwable> onError(final RangeStats rangeStats,
+                                        final BigInteger currentToken,
+                                        final BiConsumer<Throwable, BigInteger> errorReporter) {
+        return (error) -> {
+            rangeStats.partitionError();
+            errorReporter.accept(error, currentToken);
+        };
+    }
+}
+
+
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
new file mode 100644
index 0000000..a0f8043
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.*;
+import java.util.Objects;
+import java.util.concurrent.atomic.LongAdder;
+
+public class RangeStats implements Serializable {
+
+    private transient LongAdder matchedPartitions;
+    private transient LongAdder mismatchedPartitions;
+    private transient LongAdder errorPartitions;
+    private transient LongAdder skippedPartitions;
+    private transient LongAdder onlyInSource;
+    private transient LongAdder onlyInTarget;
+    private transient LongAdder matchedRows;
+    private transient LongAdder matchedValues;
+    private transient LongAdder mismatchedValues;
+
+    public static RangeStats newStats() {
+        return new RangeStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+    }
+
+    public static RangeStats withValues(long matchedPartitions,
+                                        long mismatchedPartitions,
+                                        long errorPartitions,
+                                        long skippedPartitions,
+                                        long onlyInSource,
+                                        long onlyInTarget,
+                                        long matchedRows,
+                                        long matchedValues,
+                                        long mismatchedValues) {
+
+        return new RangeStats(matchedPartitions,
+                              mismatchedPartitions,
+                              errorPartitions,
+                              skippedPartitions,
+                              onlyInSource,
+                              onlyInTarget,
+                              matchedRows,
+                              matchedValues,
+                              mismatchedValues);
+    }
+
+    private RangeStats(long matchedPartitions,
+                       long mismatchedPartitions,
+                       long errorPartitions,
+                       long skippedPartitions,
+                       long onlyInSource,
+                       long onlyInTarget,
+                       long matchedRows,
+                       long matchedValues,
+                       long mismatchedValues) {
+
+        this.matchedPartitions      = new LongAdder();
+        this.mismatchedPartitions   = new LongAdder();
+        this.errorPartitions        = new LongAdder();
+        this.skippedPartitions      = new LongAdder();
+        this.onlyInSource           = new LongAdder();
+        this.onlyInTarget           = new LongAdder();
+        this.matchedRows            = new LongAdder();
+        this.matchedValues          = new LongAdder();
+        this.mismatchedValues       = new LongAdder();
+
+        this.matchedPartitions.add(matchedPartitions);
+        this.mismatchedPartitions.add(mismatchedPartitions);
+        this.errorPartitions.add(errorPartitions);
+        this.skippedPartitions.add(skippedPartitions);
+        this.onlyInSource.add(onlyInSource);
+        this.onlyInTarget.add(onlyInTarget);
+        this.matchedRows.add(matchedRows);
+        this.matchedValues.add(matchedValues);
+        this.mismatchedValues.add(mismatchedValues);
+    }
+
+    public void matchedPartition() {
+        matchedPartitions.add(1L);
+    }
+
+    public long getMatchedPartitions() {
+        return matchedPartitions.sum();
+    }
+
+    public void onlyInSource() {
+        onlyInSource.add(1L);
+    }
+
+    public long getOnlyInSource() {
+        return onlyInSource.sum();
+    }
+
+    public void onlyInTarget() {
+        onlyInTarget.add(1L);
+    }
+
+    public long getOnlyInTarget() {
+        return onlyInTarget.sum();
+    }
+
+    public long getMatchedRows() {
+        return matchedRows.sum();
+    }
+
+    public long getMatchedValues() {
+        return matchedValues.sum();
+    }
+
+    public long getMismatchedValues() {
+        return mismatchedValues.sum();
+    }
+
+    public void mismatchedPartition() {
+        mismatchedPartitions.add(1L);
+    }
+
+    public long getMismatchedPartitions() {
+        return mismatchedPartitions.sum();
+    }
+
+    public void skipPartition() {
+        skippedPartitions.add(1L);
+    }
+
+    public long getSkippedPartitions() {
+        return skippedPartitions.sum();
+    }
+
+    public void partitionError() {
+        errorPartitions.add(1L);
+    }
+
+    public long getErrorPartitions() {
+        return errorPartitions.sum();
+    }
+
+    public RangeStats accumulate(final PartitionStats partitionStats) {
+        this.matchedRows.add(partitionStats.matchedRows);
+        this.matchedValues.add(partitionStats.matchedValues);
+        this.mismatchedValues.add(partitionStats.mismatchedValues);
+        if (partitionStats.skipped)
+            this.skippedPartitions.add(1L);
+
+        return this;
+    }
+
+    public RangeStats accumulate(final RangeStats rangeStats) {
+        this.matchedPartitions.add(rangeStats.matchedPartitions.sum());
+        this.mismatchedPartitions.add(rangeStats.mismatchedPartitions.sum());
+        this.errorPartitions.add(rangeStats.errorPartitions.sum());
+        this.skippedPartitions.add(rangeStats.skippedPartitions.sum());
+        this.onlyInSource.add(rangeStats.onlyInSource.sum());
+        this.onlyInTarget.add(rangeStats.onlyInTarget.sum());
+        this.matchedRows.add(rangeStats.matchedRows.sum());
+        this.matchedValues.add(rangeStats.matchedValues.sum());
+        this.mismatchedValues.add(rangeStats.mismatchedValues.sum());
+        return this;
+    }
+
+    public boolean allMatches () {
+        return onlyInSource.sum() == 0
+               && errorPartitions.sum() == 0
+               && onlyInTarget.sum() == 0
+               && mismatchedValues.sum() == 0
+               && skippedPartitions.sum() == 0;
+    }
+
+    public boolean isEmpty() {
+        return matchedPartitions.sum() == 0
+                && mismatchedPartitions.sum() == 0
+                && errorPartitions.sum() == 0
+                && skippedPartitions.sum() == 0
+                && onlyInSource.sum() == 0
+                && onlyInTarget.sum() == 0
+                && matchedRows.sum() == 0
+                && matchedValues.sum() == 0
+                && mismatchedValues.sum() == 0;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof RangeStats))
+            return false;
+
+        RangeStats other = (RangeStats)o;
+        return this.matchedPartitions.sum() == other.matchedPartitions.sum()
+               && mismatchedPartitions.sum() == other.mismatchedPartitions.sum()
+               && errorPartitions.sum() == other.errorPartitions.sum()
+               && skippedPartitions.sum() == other.skippedPartitions.sum()
+               && onlyInSource.sum() == other.onlyInSource.sum()
+               && onlyInTarget.sum() == other.onlyInTarget.sum()
+               && matchedRows.sum() == other.matchedRows.sum()
+               && matchedValues.sum() == other.matchedValues.sum()
+               && mismatchedValues.sum() == other.mismatchedValues.sum();
+    }
+
+    public int hashCode() {
+        return Objects.hash(matchedPartitions.sum(),
+                            mismatchedPartitions.sum(),
+                            errorPartitions.sum(),
+                            skippedPartitions.sum(),
+                            onlyInSource.sum(),
+                            onlyInTarget.sum(),
+                            matchedRows.sum(),
+                            matchedValues.sum(),
+                            mismatchedValues.sum());
+    }
+
+    public String toString() {
+        return String.format("Matched Partitions - %d, " +
+                             "Mismatched Partitions - %d, " +
+                             "Partition Errors - %d, " +
+                             "Partitions Only In Source - %d, " +
+                             "Partitions Only In Target - %d, " +
+                             "Skipped Partitions - %d, " +
+                             "Matched Rows - %d, " +
+                             "Matched Values - %d, " +
+                             "Mismatched Values - %d ",
+                             matchedPartitions.sum(),
+                             mismatchedPartitions.sum(),
+                             errorPartitions.sum(),
+                             onlyInSource.sum(),
+                             onlyInTarget.sum(),
+                             skippedPartitions.sum(),
+                             matchedRows.sum(),
+                             matchedValues.sum(),
+                             mismatchedValues.sum());
+    }
+
+    // For serialization
+
+    private RangeStats() {}
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.writeLong(matchedPartitions.sum());
+        out.writeLong(mismatchedPartitions.sum());
+        out.writeLong(errorPartitions.sum());
+        out.writeLong(skippedPartitions.sum());
+        out.writeLong(onlyInSource.sum());
+        out.writeLong(onlyInTarget.sum());
+        out.writeLong(matchedRows.sum());
+        out.writeLong(matchedValues.sum());
+        out.writeLong(mismatchedValues.sum());
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        this.matchedPartitions      = new LongAdder();
+        this.mismatchedPartitions   = new LongAdder();
+        this.errorPartitions        = new LongAdder();
+        this.skippedPartitions      = new LongAdder();
+        this.onlyInSource           = new LongAdder();
+        this.onlyInTarget           = new LongAdder();
+        this.matchedRows            = new LongAdder();
+        this.matchedValues          = new LongAdder();
+        this.mismatchedValues       = new LongAdder();
+
+        this.matchedPartitions.add(in.readLong());
+        this.mismatchedPartitions.add(in.readLong());
+        this.errorPartitions.add(in.readLong());
+        this.skippedPartitions.add(in.readLong());
+        this.onlyInSource.add(in.readLong());
+        this.onlyInTarget.add(in.readLong());
+        this.matchedRows.add(in.readLong());
+        this.matchedValues.add(in.readLong());
+        this.mismatchedValues.add(in.readLong());
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
new file mode 100644
index 0000000..d2f0963
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import com.datastax.driver.core.*;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import static org.apache.cassandra.diff.DiffContext.cqlizedString;
+
+public class TableSpec {
+
+    private final String table;
+    private ImmutableList<ColumnMetadata> clusteringColumns;
+    private ImmutableList<ColumnMetadata> regularColumns;
+
+
+    public String getTable()
+    {
+        return table;
+    }
+
+
+    public ImmutableList<ColumnMetadata> getClusteringColumns() {
+        return clusteringColumns;
+    }
+
+    public ImmutableList<ColumnMetadata> getRegularColumns() {
+        return regularColumns;
+    }
+
+
+    /**
+     * @param table the table to diff
+     * @param clusteringColumns the clustering columns, retrieved from cluster using the client
+     * @param regularColumns the non-primary key columns, retrieved from cluster using the client
+     */
+    TableSpec(final String table,
+              final List<ColumnMetadata> clusteringColumns,
+              final List<ColumnMetadata> regularColumns) {
+        this.table = table;
+        this.clusteringColumns = ImmutableList.copyOf(clusteringColumns);
+        this.regularColumns = ImmutableList.copyOf(regularColumns);
+    }
+
+    public static TableSpec make(String table, DiffCluster diffCluster) {
+        final Cluster cluster = diffCluster.cluster;
+
+        final String cqlizedKeyspace = cqlizedString(diffCluster.keyspace);
+        final String cqlizedTable = cqlizedString(table);
+
+        KeyspaceMetadata ksMetadata = cluster.getMetadata().getKeyspace(cqlizedKeyspace);
+        if (ksMetadata == null) {
+            throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", diffCluster.keyspace, diffCluster.clusterId));
+        }
+
+        TableMetadata tableMetadata = ksMetadata.getTable(cqlizedTable);
+        List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns();
+        List<ColumnMetadata> regularColumns = tableMetadata.getColumns()
+                                                           .stream()
+                                                           .filter(c -> !(clusteringColumns.contains(c)))
+                                                           .collect(Collectors.toList());
+        return new TableSpec(tableMetadata.getName(), clusteringColumns, regularColumns);
+    }
+
+    public boolean equalsNamesOnly(TableSpec other) {
+        return this.table.equals(other.table)
+            && columnNames(this.clusteringColumns).equals(columnNames(other.clusteringColumns))
+            && columnNames(this.regularColumns).equals(columnNames(other.regularColumns));
+    }
+
+    private static List<String> columnNames(List<ColumnMetadata> columns) {
+        return columns.stream().map(ColumnMetadata::getName).collect(Collectors.toList());
+    }
+
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof TableSpec))
+            return false;
+
+        TableSpec other = (TableSpec)o;
+        return this.table.equals(other.table)
+               && this.clusteringColumns.equals(other.clusteringColumns)
+               && this.regularColumns.equals(other.regularColumns);
+
+    }
+
+    public int hashCode() {
+        return Objects.hash(table, clusteringColumns, regularColumns);
+    }
+
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                          .add("table", table)
+                          .add("clusteringColumns", clusteringColumns)
+                          .add("regularColumns", regularColumns)
+                          .toString();
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java b/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java
new file mode 100644
index 0000000..3521a4b
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+
+public enum TokenHelper {
+
+    MURMUR3 {
+        @Override
+        public BigInteger min() {
+            return BigInteger.valueOf(Long.MIN_VALUE);
+        }
+
+        @Override
+        public BigInteger max() {
+            return BigInteger.valueOf(Long.MAX_VALUE);
+        }
+
+        @Override
+        public Object forBindParam(BigInteger token) {
+            return token.longValue();
+        }
+    },
+
+    RANDOM {
+        @Override
+        public BigInteger min() {
+            return BigInteger.ONE.negate();
+        }
+
+        @Override
+        public BigInteger max() {
+            return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE);
+        }
+
+        @Override
+        public Object forBindParam(BigInteger token) {
+            return token;
+        }
+    };
+
+    public abstract BigInteger min();
+    public abstract BigInteger max();
+    public abstract Object forBindParam(BigInteger token);
+
+    public static TokenHelper forPartitioner(String partitionerName) {
+        if (partitionerName.endsWith("Murmur3Partitioner")) return MURMUR3;
+        else if (partitionerName.endsWith("RandomPartitioner")) return RANDOM;
+        else throw new IllegalArgumentException("Unsupported Partitioner :" + partitionerName);
+    }
+}
diff --git a/spark-job/src/main/resources/log4j2.xml b/spark-job/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..64aec61
--- /dev/null
+++ b/spark-job/src/main/resources/log4j2.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="INFO">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <!--Logger name="org.apache.cassandra.diff" level="info">
+            <AppenderRef ref="Console"/>
+        </Logger>
+        <Logger name="org.apache.cassandra.diff" level="info">
+            <AppenderRef ref="Console"/>
+        </Logger-->
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java b/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java
new file mode 100644
index 0000000..cc7fb95
--- /dev/null
+++ b/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastax.driver.core;
+
+public class ColumnMetadataHelper {
+
+    public static ColumnMetadata column(String name) {
+        return ColumnMetadata.forAlias(null, name, null);
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java
new file mode 100644
index 0000000..002769f
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.*;
+import org.junit.Test;
+
+import com.codahale.metrics.*;
+
+import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+
+public class ComparisonExecutorTest {
+
+    @Test
+    public void submitBlocksWhenMaxTasksExceeded() throws Exception {
+        // submit maxTasks, then assert that further submission blocks until tasks are processed
+        int maxTasks = 3;
+        MetricRegistry metrics = metrics();
+        final ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge waitingToSubmit = metrics.getGauges().get("BlockedTasks");
+        assertEquals(0, waitingToSubmit.getValue());
+
+        final AtomicInteger successful = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+        final AtomicInteger failed = new AtomicInteger(0);
+        final Consumer<Throwable> onError = (t) -> failed.incrementAndGet();
+        final Phaser phaser = new Phaser(1);
+
+        BlockingTask[] tasks = new BlockingTask[5];
+        for (int i=0; i<5; i++)
+            tasks[i] = new BlockingTask(i);
+
+        // Ensure that the submission itself does not block before the max number of tasks are submitted
+        executor.submit(tasks[0], onSuccess, onError, phaser);
+        executor.submit(tasks[1], onSuccess, onError, phaser);
+        executor.submit(tasks[2], onSuccess, onError, phaser);
+        assertEquals(0, waitingToSubmit.getValue());
+
+        // Now submit another pair of tasks which should block as the executor is fully occupied
+        final CountDownLatch latch = new CountDownLatch(2);
+        Thread t1 = new Thread(() -> { latch.countDown(); executor.submit(tasks[3], onSuccess, onError, phaser);});
+        Thread t2 = new Thread(() -> { latch.countDown(); executor.submit(tasks[4], onSuccess, onError, phaser);});
+        t1.start();
+        t2.start();
+        // wait for both to attempt submission
+        latch.await();
+        assertThreadWaits(t1);
+        assertThreadWaits(t2);
+        assertEquals(2, waitingToSubmit.getValue());
+
+        // Let the first waiting task complete, which should allow t1 to complete its submission
+        tasks[0].latch.countDown();
+        t1.join();
+        // the second submission should still be waiting on a slot
+        assertThreadWaits(t2);
+        assertEquals(1, waitingToSubmit.getValue());
+
+        // Let another task complete, allowing t2 to complete its submission
+        tasks[1].latch.countDown();
+        t2.join();
+        assertEquals(0, waitingToSubmit.getValue());
+
+        // Let all tasks complete, wait for them to do so then verify counters
+        for (int i=2; i<=4; i++)
+            tasks[i].latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(5, successful.get());
+        assertEquals(0, failed.get());
+    }
+
+    @Test
+    public void handleTaskFailure() {
+        // Ensure that the failure callback is fired, a permit for task submission
+        // returned and the phaser notified when a task throws
+        int maxTasks = 5;
+        MetricRegistry metrics = metrics();
+        ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final AtomicInteger successful = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+        AtomicReference<Throwable> thrown = new AtomicReference<>();
+        final Consumer<Throwable> onError = thrown::set;
+        final Phaser phaser = new Phaser(1);
+
+        assertEquals(maxTasks, availableSlots.getValue());
+
+        RuntimeException toThrow = new RuntimeException("FAIL");
+        BlockingTask task = new BlockingTask(0, toThrow);
+        executor.submit(task, onSuccess, onError, phaser);
+        assertEquals(maxTasks - 1, availableSlots.getValue());
+
+        assertEquals(2, phaser.getUnarrivedParties());
+        task.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(0, successful.get());
+        assertEquals(toThrow, thrown.get());
+    }
+
+    @Test
+    public void handleUncaughtExceptionInFailureCallback() {
+        // Ensure that if the failure callback throws, a permit for submission is
+        // still returned and the phaser notified
+        int maxTasks = 5;
+        MetricRegistry metrics = metrics();
+        ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final AtomicInteger successful = new AtomicInteger(0);
+        final AtomicInteger failures = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+        final Consumer<Throwable> onError =  (t) -> { failures.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
+        final Phaser phaser = new Phaser(1);
+
+        assertEquals(maxTasks, availableSlots.getValue());
+
+        RuntimeException toThrow = new RuntimeException("FAIL");
+        try {
+            onError.accept(toThrow);
+            fail("Failure callback should throw RuntimeException");
+        } catch (RuntimeException e) {
+            // expected - reset failure count
+            failures.set(0);
+        }
+
+        BlockingTask task = new BlockingTask(0, toThrow);
+        executor.submit(task, onSuccess, onError, phaser);
+        assertEquals(maxTasks - 1, availableSlots.getValue());
+
+        assertEquals(2, phaser.getUnarrivedParties());
+        task.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(0, successful.get());
+        assertEquals(1, failures.get());
+    }
+
+    @Test
+    public void handleUncaughtExceptionInSuccessCallback() {
+        // Ensure that if the success callback throws, a permit for submission is
+        // still returned and the phaser notified
+        int maxTasks = 5;
+        MetricRegistry metrics = metrics();
+        ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final AtomicInteger successful = new AtomicInteger(0);
+        final AtomicInteger failures = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) ->  { successful.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
+        final Consumer<Throwable> onError =  (t) -> failures.incrementAndGet();
+        final Phaser phaser = new Phaser(1);
+
+        assertEquals(maxTasks, availableSlots.getValue());
+        try {
+            onSuccess.accept(0);
+            fail("Success callback should throw RuntimeException");
+        } catch (RuntimeException e) {
+            // expected - reset failure count
+            successful.set(0);
+        }
+
+        BlockingTask task = new BlockingTask(0);
+        executor.submit(task, onSuccess, onError, phaser);
+        assertEquals(maxTasks - 1, availableSlots.getValue());
+
+        assertEquals(2, phaser.getUnarrivedParties());
+        task.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(1, successful.get());
+        assertEquals(0, failures.get());
+    }
+
+    @Test
+    public void handleRejectedExecutionException() {
+        // In the case that the underlying ExecutorService rejects a task submission, a permit
+        // should be returned and the phaser notified
+        int maxTasks = 5;
+        final AtomicInteger successful = new AtomicInteger(0);
+        final AtomicInteger failures = new AtomicInteger(0);
+        final AtomicInteger rejections = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) ->  successful.incrementAndGet();
+        final Consumer<Throwable> onError =  (t) -> failures.incrementAndGet();
+
+        MetricRegistry metrics = metrics();
+        ExecutorService rejectingExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                                                                   new LinkedBlockingQueue<>(1),
+                                                                   (r, executor) -> { rejections.incrementAndGet();
+                                                                                      throw new RejectedExecutionException("REJECTED");});
+
+        ComparisonExecutor executor = new ComparisonExecutor(MoreExecutors.listeningDecorator(rejectingExecutor), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final Phaser phaser = new Phaser(1);
+
+        // Submit an initial pair of tasks to ensure that the underlying work queue is full
+        BlockingTask t0 = new BlockingTask(0);
+        BlockingTask t1 = new BlockingTask(1);
+        executor.submit(t0, onSuccess, onError, phaser);
+        executor.submit(t1, onSuccess, onError, phaser);
+        assertEquals(3, phaser.getUnarrivedParties());
+        assertEquals(maxTasks - 2, availableSlots.getValue());
+
+        // Submit a third task which will be rejected by the executor service
+        executor.submit(new BlockingTask(2), onSuccess, onError, phaser);
+        t0.latch.countDown();
+        t1.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(2, successful.get());
+        assertEquals(1, failures.get());
+        assertEquals(1, rejections.get());
+    }
+
+    class BlockingTask implements Callable<Integer> {
+
+        final int id;
+        final Exception e;
+        final CountDownLatch latch;
+
+        BlockingTask(int id) {
+            this(id, null);
+        }
+
+        BlockingTask(int id, Exception toThrow) {
+            this.id = id;
+            this.e = toThrow;
+            this.latch = new CountDownLatch(1);
+        }
+
+        public Integer call() throws Exception {
+            latch.await();
+            if (e != null)
+                throw e;
+            return id;
+        }
+    }
+
+    private static ListeningExecutorService executor(int threads) {
+        return MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(threads,
+                                         new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
+                                                                   .setDaemon(true)
+                                                                   .build()));
+    }
+
+    private static MetricRegistry metrics() {
+        return new MetricRegistry();
+    }
+}
+
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
new file mode 100644
index 0000000..d8d92c7
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.diff.DiffJob;
+import org.apache.cassandra.diff.TokenHelper;
+
+import static org.junit.Assert.assertEquals;
+
+public class DiffJobTest
+{
+    @Test
+    public void testSplitsM3P()
+    {
+        splitTestHelper(TokenHelper.forPartitioner("Murmur3Partitioner"));
+    }
+    @Test
+    public void testSplitsRandom()
+    {
+        splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner"));
+    }
+
+    private void splitTestHelper(TokenHelper tokens)
+    {
+        List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens);
+        assertEquals(splits.get(0).start, tokens.min());
+        DiffJob.Split prevSplit = null;
+        for (DiffJob.Split split : splits)
+        {
+            if (prevSplit != null)
+                assertEquals(prevSplit.end, split.start.subtract(BigInteger.ONE));
+            prevSplit = split;
+        }
+        assertEquals(splits.get(splits.size() - 1).end, tokens.max());
+        for (int i = 0; i < splits.size(); i++)
+            assertEquals(i, splits.get(i).splitNumber);
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
new file mode 100644
index 0000000..73677d9
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.google.common.base.VerifyException;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.diff.DiffJob;
+import org.apache.cassandra.diff.Differ;
+import org.apache.cassandra.diff.RangeStats;
+import org.apache.cassandra.diff.TokenHelper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class DifferTest {
+
+    @Test(expected = VerifyException.class)
+    public void rejectNullStartOfRange() {
+        Differ.validateRange(null, BigInteger.TEN, TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectNullEndOfRange() {
+        Differ.validateRange(BigInteger.TEN, null, TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectWrappingRange() {
+        Differ.validateRange(BigInteger.TEN, BigInteger.ONE, TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectRangeWithStartLessThanMinMurmurToken() {
+        Differ.validateRange(TokenHelper.MURMUR3.min().subtract(BigInteger.ONE),
+                             BigInteger.TEN,
+                             TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectRangeWithEndGreaterThanMaxMurmurToken() {
+        Differ.validateRange(BigInteger.ONE,
+                             TokenHelper.MURMUR3.max().add(BigInteger.ONE),
+                             TokenHelper.MURMUR3);
+    }
+
+    @Test
+    public void filterTaskStatusForTables() {
+        // according to the journal:
+        // * t1 is already completed
+        // * t2 is started and has reported some progress, but has not completed
+        // * t3 has not reported any progress
+        DiffJob.Split split = new DiffJob.Split(1, 1, BigInteger.ONE, BigInteger.TEN);
+        Iterable<String> tables = Lists.newArrayList("t1", "t2", "t3");
+        Function<String, DiffJob.TaskStatus> journal = (table) -> {
+            switch (table) {
+                case "t1":
+                    return new DiffJob.TaskStatus(split.end, RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6));
+                case "t2":
+                    return new DiffJob.TaskStatus(BigInteger.valueOf(5L), RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5));
+                case "t3":
+                    return DiffJob.TaskStatus.EMPTY;
+                default:
+                    throw new AssertionError();
+            }
+        };
+
+        Map<String, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
+        assertEquals(2, filtered.keySet().size());
+        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
+        assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
+        assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
+        assertNull(filtered.get("t3").lastToken);
+
+        // if re-running (part of) a job because of failures or problematic partitions, we want to
+        // ignore the status of completed tasks and re-run them anyway as only specified tokens will
+        // be processed - so t1 should be included now
+        filtered = Differ.filterTables(tables, split, journal, true);
+        assertEquals(3, filtered.keySet().size());
+        assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get("t1").stats);
+        assertEquals(split.end, filtered.get("t1").lastToken);
+        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
+        assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
+        assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
+        assertNull(filtered.get("t3").lastToken);
+    }
+
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
new file mode 100644
index 0000000..79b3638
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import org.junit.Test;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.diff.PartitionComparator;
+import org.apache.cassandra.diff.PartitionStats;
+import org.apache.cassandra.diff.TableSpec;
+
+import static org.junit.Assert.assertEquals;
+
+public class PartitionComparatorTest {
+
+    @Test
+    public void sourceIsNull() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, null, rows(row(t, 0, 1, 2, 3)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, true, true, 0, 0, 0);
+    }
+
+    @Test
+    public void targetIsNull() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(row(t, 0, 1, 2, 3)), null);
+        PartitionStats stats = comparator.call();
+        assertStats(stats, true, true, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceIsEmpty() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(), rows(row(t, 0, 1, 2, 3)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void targetIsEmpty() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(row(t, 0, 1, 2, 3)), rows());
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceAndTargetAreEmpty() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(), rows());
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceContainsExtraRowsAtStart() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)),
+                                                    rows(row(t, 10, 11, 12, 13)));
+        PartitionStats stats = comparator.call();
+        // Comparison fails fast, so bails on the initial mismatch
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void targetContainsExtraRowsAtStart() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 10, 11, 12, 13)),
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)));
+        PartitionStats stats = comparator.call();
+        // Comparison fails fast, so bails on the initial mismatch
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceContainsExtraRowsAtEnd() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)),
+                                                    rows(row(t, 0, 1, 2, 3)));
+        PartitionStats stats = comparator.call();
+        // The fact that the first row & all its v1 & v2 values match should be reflected in the stats
+        assertStats(stats, false, false, 1, 2, 0);
+    }
+
+    @Test
+    public void targetContainsExtraRowsAtEnd() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3)),
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)));
+        PartitionStats stats = comparator.call();
+        // The fact that the first row & all its v1 & v2 values match should be reflected in the stats
+        assertStats(stats, false, false, 1, 2, 0);
+    }
+
+    @Test
+    public void withoutClusteringsAllRowsMatching() {
+        TableSpec t = spec("table1", names(), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 11),
+                                                         row(t, 2, 12)),
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 11),
+                                                         row(t, 2, 12)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 6, 0);
+    }
+
+    @Test
+    public void singleClusteringAllRowsMatching() {
+        TableSpec t = spec("table1", names("c1"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2),
+                                                         row(t, 1, 11, 12),
+                                                         row(t, 2, 21, 22)),
+                                                    rows(row(t, 0, 1, 2),
+                                                         row(t, 1, 11, 12),
+                                                         row(t, 2, 21, 22)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 6, 0);
+    }
+
+    @Test
+    public void multipleClusteringAllRowsMatching() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 1, 11, 12, 13),
+                                                         row(t, 2, 21, 22, 23)),
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 1, 11, 12, 13),
+                                                         row(t, 2, 21, 22, 23)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 6, 0);
+    }
+
+    @Test
+    public void withoutClusteringsWithMismatches() {
+        TableSpec t = spec("table1", names(), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 11),
+                                                         row(t, 2, 12)),
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 1100),
+                                                         row(t, 2, 1200)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 4, 2);
+    }
+
+    @Test
+    public void singleClusteringWithMismatches() {
+        TableSpec t = spec("table1", names("c1"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2),
+                                                         row(t, 1, 11, 21),
+                                                         row(t, 2, 12, 22)),
+                                                    rows(row(t, 0, 1, 20),
+                                                         row(t, 1, 1100, 21),
+                                                         row(t, 2, 12, 1200)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 3, 3);
+    }
+
+    private void assertStats(PartitionStats stats,
+                             boolean skipped,
+                             boolean clusteringsMatch,
+                             int matchedRows,
+                             int matchedValues,
+                             int mismatchedValues) {
+        assertEquals(skipped, stats.skipped);
+        assertEquals(clusteringsMatch, stats.allClusteringsMatch);
+        assertEquals(matchedRows, stats.matchedRows);
+        assertEquals(matchedValues, stats.matchedValues);
+        assertEquals(mismatchedValues, stats.mismatchedValues);
+    }
+
+    PartitionComparator comparator(TableSpec table, Iterator<Row> source, Iterator<Row> target) {
+        return new PartitionComparator(table, source, target);
+    }
+
+    List<String> names(String...names) {
+        return Lists.newArrayList(names);
+    }
+
+    TableSpec spec(String table, List<String> clusteringColumns, List<String> regularColumns) {
+        return new TableSpec(table, columns(clusteringColumns), columns(regularColumns));
+    }
+
+    List<ColumnMetadata> columns(List<String> names) {
+        return names.stream().map(ColumnMetadataHelper::column).collect(Collectors.toList());
+    }
+
+    Iterator<Row> rows(Row...rows) {
+        return new AbstractIterator<Row>() {
+            int i = 0;
+            protected Row computeNext() {
+                if (i < rows.length)
+                    return rows[i++];
+                return endOfData();
+            }
+        };
+    }
+
+    Row row(TableSpec table, Object...values) {
+        return new TestRow(Stream.concat(table.getClusteringColumns().stream(),
+                                         table.getRegularColumns().stream())
+                                 .map(ColumnMetadata::getName).toArray(String[]::new),
+                           values);
+    }
+
+    static class TestRow implements Row {
+        private final String[] names;
+        private final Object[] values;
+
+        TestRow(String[] names, Object[] values) {
+            if (names.length != values.length)
+                throw new IllegalArgumentException(String.format("Number of column names (%d) doesn't " +
+                                                                 "match number of values(%d)",
+                                                                 names.length, values.length));
+            this.names = names;
+            this.values = values;
+        }
+
+        // Only getObject(String) is used by PartitionComparator
+        public Object getObject(String s) {
+            for (int i=0; i < names.length; i++)
+                if (names[i].equals(s))
+                    return values[i];
+
+            throw new IllegalArgumentException(s + " is not a column defined in this metadata");
+        }
+
+        public boolean isNull(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean getBool(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public byte getByte(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public short getShort(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public int getInt(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getLong(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Date getTimestamp(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public LocalDate getDate(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getTime(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public float getFloat(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public double getDouble(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytesUnsafe(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytes(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public String getString(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigInteger getVarint(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigDecimal getDecimal(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UUID getUUID(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public InetAddress getInet(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(String s, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(String s, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(String s, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(String s, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(String s, Class<K> aClass, Class<V> aClass1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UDTValue getUDTValue(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public TupleValue getTupleValue(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(String s, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(String s, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(String s, TypeCodec<T> typeCodec) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ColumnDefinitions getColumnDefinitions() {
+            throw new UnsupportedOperationException();
+        }
+
+        public Token getToken(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Token getToken(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Token getPartitionKeyToken() {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isNull(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean getBool(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public byte getByte(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public short getShort(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public int getInt(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getLong(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Date getTimestamp(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public LocalDate getDate(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getTime(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public float getFloat(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public double getDouble(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytesUnsafe(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytes(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public String getString(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigInteger getVarint(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigDecimal getDecimal(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UUID getUUID(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public InetAddress getInet(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(int i, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(int i, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(int i, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(int i, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(int i, Class<K> aClass, Class<V> aClass1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UDTValue getUDTValue(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public TupleValue getTupleValue(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Object getObject(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(int i, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(int i, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(int i, TypeCodec<T> typeCodec) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java
new file mode 100644
index 0000000..0484212
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.*;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Token;
+import org.jetbrains.annotations.NotNull;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
+
+public class RangeComparatorTest {
+
+    private Multimap<BigInteger, Throwable> errors = HashMultimap.create();
+    private BiConsumer<Throwable, BigInteger> errorReporter = (e, t) -> errors.put(t, e);
+    private Multimap<BigInteger, MismatchType> mismatches = HashMultimap.create();
+    private BiConsumer<MismatchType, BigInteger> mismatchReporter = (m, t) -> mismatches.put(t, m);
+    private Multimap<BigInteger, RangeStats> journal= HashMultimap.create();
+    private BiConsumer<RangeStats, BigInteger> progressReporter = (r, t) -> journal.put(t, copyOf(r));
+    private Set<BigInteger> comparedPartitions = new HashSet<>();
+    private ComparisonExecutor executor = ComparisonExecutor.newExecutor(1, new MetricRegistry());
+
+    @Test
+    public void emptyRange() {
+        RangeComparator comparator = comparator(context(100L, 100L));
+        RangeStats stats = comparator.compare(keys(), keys(), this::alwaysMatch);
+        assertTrue(stats.isEmpty());
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void sourceAndTargetKeysAreEmpty() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(), keys(), this::alwaysMatch);
+        assertTrue(stats.isEmpty());
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void partitionPresentOnlyInSource() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(1, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(2, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(0, 1);
+    }
+
+    @Test
+    public void partitionPresentOnlyInTarget() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(7, 8), keys(7, 8, 9), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(1, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(9, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(7, 8);
+    }
+
+    @Test
+    public void multipleOnlyInSource() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(4, 5, 6, 7, 8), keys(4, 5), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(3, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(6, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(7, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(8, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(4, 5);
+    }
+
+    @Test
+    public void multipleOnlyInTarget() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(4, 5), keys(4, 5, 6, 7, 8), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(3, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(6, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(7, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(8, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(4, 5);
+    }
+
+    @Test
+    public void multipleOnlyInBoth() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 3, 5, 7, 9), keys(0, 2, 4, 6, 8, 9), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(4, stats.getOnlyInSource());
+        assertEquals(4, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(1, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(3, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(5, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(7, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(2, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(4, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(6, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(8, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(0, 9);
+    }
+
+    @Test
+    public void sourceKeysIsEmpty() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(), keys(4, 5), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(2, stats.getOnlyInTarget());
+        assertEquals(0, stats.getMatchedPartitions());
+        assertReported(4, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(5, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void targetKeysIsEmpty() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(4, 5), keys(), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(2, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(0, stats.getMatchedPartitions());
+        assertReported(4, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(5, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void skipComparisonOfDisallowedTokens() {
+        RangeComparator comparator = comparator(context(0L, 100L, 2, 3, 4));
+        RangeStats stats = comparator.compare(keys(0, 1, 2, 3, 4, 5, 6),
+                                              keys(0, 1, 2, 3, 4, 5, 6),
+                                              this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(4, stats.getMatchedPartitions());
+        assertEquals(3, stats.getSkippedPartitions());
+        assertNothingReported(errors,  mismatches, journal);
+        assertCompared(0, 1, 5 , 6);
+    }
+
+    @Test
+    public void handleErrorReadingFirstSourceKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+        assertCompared();
+    }
+
+    @Test
+    public void handleErrorReadingFirstTargetKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, 1, 2), keys(0, toThrow, 0, 1, 2), toThrow);
+        assertCompared();
+    }
+
+    @Test
+    public void handleErrorReadingSourceKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(1, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+        assertCompared(0);
+    }
+
+    @Test
+    public void handleErrorReadingTargetKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, 1, 2), keys(1, toThrow, 0, 1, 2), toThrow);
+        assertCompared(0);
+    }
+
+    @Test
+    public void handleReadingLastSourceKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(2, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+        assertCompared(0, 1);
+    }
+
+    @Test
+    public void handleReadingLastTargetKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, 1, 2), keys(2, toThrow, 0, 1, 2), toThrow);
+        assertCompared(0, 1);
+    }
+
+    @Test
+    public void handleErrorConstructingFirstTask() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringConstruction(0, expected));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(1, 2);
+        assertReported(0, expected, errors);
+    }
+
+    @Test
+    public void handleErrorConstructingTask() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringConstruction(1, expected));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 2);
+        assertReported(1, expected, errors);
+    }
+
+    @Test
+    public void handleErrorConstructingLastTask() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringConstruction(2, expected));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 1);
+        assertReported(2, expected, errors);
+    }
+
+    @Test
+    public void handleTaskErrorOnFirstPartition() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringExecution(expected, 0));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(1, 2);
+        assertReported(0, expected, errors);
+    }
+
+    @Test
+    public void handleTaskErrorOnPartition() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringExecution(expected, 1));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 2);
+        assertReported(1, expected, errors);
+    }
+
+    @Test
+    public void handleTaskErrorOnLastPartition() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringExecution(expected, 2));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 1);
+        assertReported(2, expected, errors);
+    }
+
+    @Test
+    public void checkpointEveryTenPartitions() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        comparator.compare(keys(LongStream.range(0, 25).toArray()),
+                           keys(LongStream.range(0, 25).toArray()),
+                           this::alwaysMatch);
+        assertReported(9, RangeStats.withValues(10, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertReported(19, RangeStats.withValues(20, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertEquals(2, journal.keySet().size());
+    }
+
+    @Test
+    public void recordHighestSeenPartitionWhenTasksCompleteOutOfOrder() {
+        // every 10 partitions, the highest seen token is reported to the journal. Here,
+        // randomise the iteration of the keys to simulate tasks completing out of order
+        RangeComparator comparator = comparator(context(0L, 100L));
+        long[] tokens = new long[] {2, 8, 1, 4, 100, 3, 5, 7, 6, 9};
+        comparator.compare(keys(tokens),
+                           keys(tokens),
+                           this::alwaysMatch);
+        assertReported(100, RangeStats.withValues(10, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertEquals(1, journal.keySet().size());
+        assertNothingReported(errors, mismatches);
+        assertCompared(tokens);
+    }
+
+    @Test
+    public void recordHighestSeenPartitionWhenTasksCompleteOutOfOrderWithErrors() {
+        // 2 partitions will error during comparison, but after 10 successful comparisons
+        // we'll still report progress to the journal
+        RuntimeException toThrow = new RuntimeException("Test");
+        RangeComparator comparator = comparator(context(0L, 100L));
+        long[] tokens = new long[] {2, 8, 1, 11, 4, 100, 3, 5, 7, 6, 9, 0};
+        comparator.compare(keys(tokens), keys(tokens), throwDuringExecution(toThrow, 3, 2));
+        assertReported(100, RangeStats.withValues(10, 0L, 2L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertEquals(1, journal.keySet().size());
+        assertReported(2, toThrow, errors);
+        assertReported(3, toThrow, errors);
+        assertEquals(2, errors.keySet().size());
+        assertNothingReported(mismatches);
+        // the erroring tasks don't get counted in the test as compared
+        assertCompared(8, 1, 11, 4, 100, 5, 7, 6, 9, 0);
+    }
+
+    @Test
+    public void rowLevelMismatchIncrementsPartitionMismatches() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::rowMismatch);
+        assertEquals(0, stats.getMatchedPartitions());
+        assertEquals(3, stats.getMismatchedPartitions());
+        assertEquals(0, stats.getMatchedValues());
+        assertEquals(0, stats.getMismatchedValues());
+        assertNothingReported(errors, journal);
+        assertReported(0, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(1, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(2, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertCompared(0, 1, 2);
+    }
+
+    @Test
+    public void valueMismatchIncrementsPartitionMismatches() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::valuesMismatch);
+        assertEquals(0, stats.getMatchedPartitions());
+        assertEquals(3, stats.getMismatchedPartitions());
+        assertEquals(0, stats.getMatchedValues());
+        assertEquals(30, stats.getMismatchedValues());
+        assertNothingReported(errors, journal);
+        assertReported(0, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(1, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(2, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertCompared(0, 1, 2);
+    }
+
+    @Test
+    public void matchingPartitionIncrementsCount() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::alwaysMatch);
+        assertEquals(3, stats.getMatchedPartitions());
+        assertEquals(0, stats.getMismatchedPartitions());
+        assertEquals(0, stats.getMatchedValues());
+        assertEquals(0, stats.getMismatchedValues());
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared(0, 1, 2);
+    }
+
+    @Test
+    public void waitForAllInFlightTasksToComplete() throws InterruptedException {
+        CountDownLatch taskSubmissions = new CountDownLatch(2);
+        List<CountDownLatch> taskGates = Lists.newArrayList(new CountDownLatch(1), new CountDownLatch(1));
+        RangeComparator comparator = comparator(context(0L, 100L));
+
+        Thread t = new Thread(() -> comparator.compare(keys(0, 1),
+                                                       keys(0, 1),
+                                                       waitUntilNotified(taskSubmissions, taskGates)),
+                              "CallingThread");
+        t.setDaemon(true);
+        t.start();
+
+        // wait for both tasks to be submitted then check that the calling thread enters a waiting state
+        taskSubmissions.await();
+        assertThreadWaits(t);
+
+        // let the first task complete and check that the calling thread is still waiting
+        taskGates.get(0).countDown();
+        assertThreadWaits(t);
+
+        // let the second task run and wait for the caller to terminate
+        taskGates.get(1).countDown();
+        t.join();
+
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared(0, 1);
+    }
+
+    private RangeStats testTaskError(Function<PartitionKey, PartitionComparator> taskSupplier) {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        Iterator<PartitionKey> sourceKeys = keys(0, 1, 2);
+        Iterator<PartitionKey> targetKeys = keys(0, 1, 2);
+        RangeStats stats = comparator.compare(sourceKeys, targetKeys, taskSupplier);
+        assertNothingReported(mismatches, journal);
+        return stats;
+    }
+
+    private void testErrorReadingKey(Iterator<PartitionKey> sourceKeys,
+                                     Iterator<PartitionKey> targetKeys,
+                                     Exception expected) {
+
+        RangeComparator comparator = comparator(context(0L, 100L));
+        try {
+            comparator.compare(sourceKeys, targetKeys, this::alwaysMatch);
+            fail("Expected exception " + expected.getLocalizedMessage());
+        } catch (Exception e) {
+            assertEquals(expected, e.getCause());
+        }
+        assertNothingReported(errors,  mismatches, journal);
+    }
+
+    private void assertCompared(long...tokens) {
+        assertEquals(comparedPartitions.size(), tokens.length);
+        for(long t : tokens)
+            assertTrue(comparedPartitions.contains(BigInteger.valueOf(t)));
+    }
+
+    private void assertNothingReported(Multimap...reported) {
+        for (Multimap m : reported)
+            assertTrue(m.isEmpty());
+    }
+
+    private <T> void assertReported(long token, T expected, Multimap<BigInteger, T> reported) {
+        Collection<T> values = reported.get(BigInteger.valueOf(token));
+        assertEquals(1, values.size());
+        assertEquals(expected, values.iterator().next());
+    }
+
+    Iterator<PartitionKey> keys(long throwAtToken, RuntimeException e, long...tokens) {
+        return new AbstractIterator<PartitionKey>() {
+            int i = 0;
+            protected PartitionKey computeNext() {
+                if (i < tokens.length) {
+                    long t = tokens[i++];
+                    if (t == throwAtToken)
+                        throw e;
+
+                    return key(t);
+                }
+                return endOfData();
+            }
+        };
+    }
+
+    Iterator<PartitionKey> keys(long...tokens) {
+        return new AbstractIterator<PartitionKey>() {
+            int i = 0;
+            protected PartitionKey computeNext() {
+                if (i < tokens.length)
+                    return key(tokens[i++]);
+                return endOfData();
+            }
+        };
+    }
+
+    // yield a PartitionComparator which always concludes that partitions being compared are identical
+    PartitionComparator alwaysMatch(PartitionKey key) {
+        return new PartitionComparator(null, null, null) {
+            public PartitionStats call() {
+                comparedPartitions.add(key.getTokenAsBigInteger());
+                return new PartitionStats();
+            }
+        };
+    }
+
+    // yield a PartitionComparator which always determines that the partitions have a row-level mismatch
+    PartitionComparator rowMismatch(PartitionKey key) {
+        return new PartitionComparator(null, null,  null) {
+            public PartitionStats call() {
+                comparedPartitions.add(key.getTokenAsBigInteger());
+                PartitionStats stats = new PartitionStats();
+                stats.allClusteringsMatch = false;
+                return stats;
+            }
+        };
+    }
+
+    // yield a PartitionComparator which always determines that the partitions have a 10 mismatching values
+    PartitionComparator valuesMismatch(PartitionKey key) {
+        return new PartitionComparator(null, null,  null) {
+            public PartitionStats call() {
+                comparedPartitions.add(key.getTokenAsBigInteger());
+                PartitionStats stats = new PartitionStats();
+                stats.mismatchedValues = 10;
+                return stats;
+            }
+        };
+    }
+
+    // yield a function which throws when creating a PartitionComparator for a give token
+    // simulates an error reading the source or target partition from the source or target
+    // cluster when constructing the task
+    Function<PartitionKey, PartitionComparator> throwDuringConstruction(long throwAt, RuntimeException toThrow) {
+        return (key) -> {
+            BigInteger t = key.getTokenAsBigInteger();
+            if (t.longValue() == throwAt)
+                throw toThrow;
+
+            return alwaysMatch(key);
+        };
+    }
+
+    // yields a PartitionComparator which throws the supplied exception when the token matches
+    // the one specified to simulate an error when processing the comparison
+    Function<PartitionKey, PartitionComparator> throwDuringExecution(RuntimeException toThrow, long...throwAt) {
+        return (key) -> {
+            BigInteger t = key.getTokenAsBigInteger();
+            return new PartitionComparator(null, null, null) {
+                public PartitionStats call() {
+                    for (long shouldThrow : throwAt)
+                        if (t.longValue() == shouldThrow)
+                           throw toThrow;
+
+                    comparedPartitions.add(t);
+                    return new PartitionStats();
+                }
+            };
+        };
+    }
+
+    // yields a PartitionComparator which waits on a CountDownLatch before returning from call(). The latches
+    // are supplied in an iterator and each successive task yielded uses the next supplied latch so that callers
+    // can control the rate of task progress
+    // The other supplied latch, firstTaskStarted, is used to signal to the caller that execution of the first
+    // task has started, so the test doesn't complete before this happens
+    Function<PartitionKey, PartitionComparator> waitUntilNotified(final CountDownLatch taskSubmissions,
+                                                                  final List<CountDownLatch> taskGates) {
+        final Iterator<CountDownLatch> gateIter = taskGates.iterator();
+        return (key) -> {
+
+            BigInteger t = key.getTokenAsBigInteger();
+            taskSubmissions.countDown();
+
+            return new PartitionComparator(null, null, null) {
+                public PartitionStats call() {
+                    if (!gateIter.hasNext())
+                        fail("Expected a latch to control task progress");
+
+                    try {
+                        gateIter.next().await();
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                        fail("Interrupted");
+                    }
+                    comparedPartitions.add(t);
+                    return new PartitionStats();
+                }
+            };
+        };
+    }
+
+    PartitionKey key(long token) {
+        return new TestPartitionKey(token);
+    }
+
+    RangeComparator comparator(DiffContext context) {
+        return new RangeComparator(context, errorReporter, mismatchReporter, progressReporter, executor);
+    }
+
+    DiffContext context(long startToken, long endToken, long...disallowedTokens) {
+        return new TestContext(BigInteger.valueOf(startToken),
+                               BigInteger.valueOf(endToken),
+                               new SpecificTokens(Arrays.stream(disallowedTokens)
+                                                        .mapToObj(BigInteger::valueOf)
+                                                        .collect(Collectors.toSet()),
+                                                  SpecificTokens.Modifier.REJECT));
+    }
+
+    DiffContext context(long startToken, long endToken) {
+        return new TestContext(BigInteger.valueOf(startToken), BigInteger.valueOf(endToken), SpecificTokens.NONE);
+    }
+
+    RangeStats copyOf(RangeStats stats) {
+        return RangeStats.withValues(stats.getMatchedPartitions(),
+                                     stats.getMismatchedPartitions(),
+                                     stats.getErrorPartitions(),
+                                     stats.getSkippedPartitions(),
+                                     stats.getOnlyInSource(),
+                                     stats.getOnlyInTarget(),
+                                     stats.getMatchedRows(),
+                                     stats.getMatchedValues(),
+                                     stats.getMismatchedValues());
+    }
+
+    static class TestPartitionKey extends PartitionKey {
+        final Token token;
+
+        TestPartitionKey(final long tokenValue) {
+            super(null);
+            token = new Token() {
+
+                public DataType getType() {
+                    return DataType.bigint();
+                }
+
+                public Object getValue() {
+                    return tokenValue;
+                }
+
+                public ByteBuffer serialize(ProtocolVersion protocolVersion) {
+                    return null;
+                }
+
+                public int compareTo(@NotNull Token o) {
+                    assert o.getValue() instanceof Long;
+                    return Long.compare(tokenValue, (long)o.getValue());
+                }
+            };
+        }
+
+        protected Token getToken() {
+            return token;
+        }
+    }
+
+    static class TestContext extends DiffContext {
+
+        public TestContext(BigInteger startToken,
+                           BigInteger endToken,
+                           SpecificTokens specificTokens) {
+            super(null, null, null, null, startToken, endToken, specificTokens, 0.0);
+        }
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java b/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java
new file mode 100644
index 0000000..ad9e1c4
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import static org.junit.Assert.fail;
+
+public class TestUtils {
+
+    public static void assertThreadWaits(Thread t) {
+        for (int i=0; i < 1000; i++) {
+            try {
+                Thread.sleep(1);
+            }
+            catch (InterruptedException e) {
+                fail("Unexpected InterruptedException");
+            }
+            if (t.getState() == Thread.State.WAITING)
+                return;
+        }
+        fail(String.format("Thread %s expected to enter WAITING state, but failed to do so", t.getName()));
+    }
+
+}
diff --git a/spark-job/src/test/resources/cql-stress-narrow1.yaml b/spark-job/src/test/resources/cql-stress-narrow1.yaml
new file mode 100644
index 0000000..dae375b
--- /dev/null
+++ b/spark-job/src/test/resources/cql-stress-narrow1.yaml
@@ -0,0 +1,62 @@
+#
+# Keyspace info
+#
+keyspace: difftest
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE difftest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: narrow1
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE narrow1 (
+        pk int,
+        v1 text,
+        v2 int,
+        PRIMARY KEY(pk)
+  ) 
+    WITH compaction = { 'class':'LeveledCompactionStrategy' }
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
+#
+columnspec:
+  - name: pk
+    population: uniform(1..1B)
+  - name: v1
+    size: fixed(20)
+  - name: v2
+    population: uniform(1..100)
+
+insert:
+  partitions: fixed(50)       # number of unique partitions to update in a single operation
+  batchtype: UNLOGGED         # type of batch to use
+
+queries:
+   simple1:
+      cql: select * from narrow1 where pk = ? LIMIT 100
+      fields: samerow         # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
diff --git a/spark-job/src/test/resources/cql-stress-wide1.yaml b/spark-job/src/test/resources/cql-stress-wide1.yaml
new file mode 100644
index 0000000..145ecce
--- /dev/null
+++ b/spark-job/src/test/resources/cql-stress-wide1.yaml
@@ -0,0 +1,69 @@
+#
+# Keyspace info
+#
+keyspace: difftest
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE difftest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: wide1
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE wide1 (
+        pk int,
+        c1 int,
+        c2 int,
+        v1 text,
+        v2 int,
+        PRIMARY KEY(pk, c1, c2)
+  ) 
+    WITH compaction = { 'class':'LeveledCompactionStrategy' }
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
+#
+columnspec:
+  - name: pk
+    population: uniform(1..1B)
+  - name: c1
+    cluster: uniform(1..100)
+  - name: c2
+    cluster: uniform(1..100)
+  - name: v1
+    size: fixed(20)
+  - name: v2
+    population: uniform(1..100)
+
+insert:
+  partitions: fixed(1)       # number of unique partitions to update in a single operation
+  batchtype: UNLOGGED        # type of batch to use
+
+
+queries:
+   simple1:
+      cql: select * from wide where pk = ? AND c1 = ? AND c2 = ? LIMIT 1
+      fields: samerow


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org