You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/20 09:57:37 UTC

[cassandra-diff] branch master updated: Support running diff on multiple keyspaces

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git


The following commit(s) were added to refs/heads/master by this push:
     new e9782c6  Support running diff on multiple keyspaces
e9782c6 is described below

commit e9782c6b59a0888e4c63248ef95468e6b176406d
Author: Yifan Cai <yi...@apple.com>
AuthorDate: Fri May 15 13:57:48 2020 -0700

    Support running diff on multiple keyspaces
    
    Patch by Yifan Cai; reviewed by marcuse for CASSANDRA-15807
    
    closes #8
---
 README.md                                          |  17 ++-
 .../cassandra/diff/api/services/DBService.java     |  61 ++++----
 common/pom.xml                                     |   7 +
 .../apache/cassandra/diff/JobConfiguration.java    |   4 +-
 .../apache/cassandra/diff/KeyspaceTablePair.java   |  61 ++++++++
 .../cassandra/diff/YamlJobConfiguration.java       |  14 +-
 .../cassandra/diff/YamlJobConfigurationTest.java   |  16 ++
 .../src/test/resources/testconfig.yaml             |  10 +-
 ...onfig.yaml => localconfig-multi-keyspaces.yaml} |   9 +-
 spark-job/localconfig.yaml                         |   8 +-
 .../apache/cassandra/diff/ComparisonExecutor.java  |  12 +-
 .../org/apache/cassandra/diff/DiffCluster.java     |  95 +++++++-----
 .../java/org/apache/cassandra/diff/DiffJob.java    |  57 ++++----
 .../java/org/apache/cassandra/diff/Differ.java     |  98 ++++++-------
 .../org/apache/cassandra/diff/JobMetadataDb.java   | 161 +++++++++++----------
 .../apache/cassandra/diff/PartitionComparator.java |   5 +-
 .../org/apache/cassandra/diff/PartitionKey.java    |   5 +-
 .../org/apache/cassandra/diff/RangeComparator.java |   8 +-
 .../java/org/apache/cassandra/diff/RangeStats.java |   5 +-
 .../java/org/apache/cassandra/diff/TableSpec.java  |  37 +++--
 .../org/apache/cassandra/diff/DiffJobTest.java     |   3 -
 .../java/org/apache/cassandra/diff/DifferTest.java |  39 +++--
 .../cassandra/diff/PartitionComparatorTest.java    |   5 +-
 23 files changed, 427 insertions(+), 310 deletions(-)

diff --git a/README.md b/README.md
index 5fab877..c17ef59 100644
--- a/README.md
+++ b/README.md
@@ -41,9 +41,15 @@ $ cd cassandra-diff
 $ mvn package
 $ docker run --name cas-src -d  -p 9042:9042 cassandra:3.0.18
 $ docker run --name cas-tgt -d  -p 9043:9042 cassandra:latest
-$ docker exec cas-src cassandra-stress write n=1k
-$ docker exec cas-tgt cassandra-stress write n=1k
+# Create 1000 rows in table "standard1" under keyspace "keyspace1"
+$ docker exec cas-src cassandra-stress write n=1k -schema keyspace="keyspace1"
+$ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace1"
+# Optionally, create another 1000 rows in table "standard1" under keyspace "keyspace2"
+$ docker exec cas-src cassandra-stress write n=1k -schema keyspace="keyspace2"
+$ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace2"
 $ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig.yaml
+# If rows are created in "keyspace2", you can run pick up the localconfig-multi-keyspaces.yaml to compare data across multiple keyspaces! See the command below.
+# $ spark-submit --verbose --files ./spark-job/localconfig-multi-keyspaces.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig-multi-keyspaces.yaml
 # ... logs
 INFO  DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 }
 ## start api-server:
@@ -55,9 +61,8 @@ $ curl -s localhost:8089/jobs/recent | python -mjson.tool
       {
           "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
           "buckets": 100,
-          "keyspace": "keyspace1",
-          "tables": [
-              "standard1"
+          "keyspace_tables": [
+              "keyspace1.standard1"
           ],
           "sourceClusterName": "local_test_1",
           "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
@@ -71,7 +76,7 @@ $ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | pyt
   [
       {
           "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
-          "table": "standard1",
+          "table": "keyspace1.standard1",
           "matchedPartitions": 1000,
           "mismatchedPartitions": 0,
           "matchedRows": 1000,
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
index 1bf1f88..0a9707a 100644
--- a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
@@ -80,8 +80,7 @@ public class DBService implements Closeable {
             "   job_id," +
             "   job_start_time," +
             "   buckets," +
-            "   keyspace_name, " +
-            "   table_names, " +
+            "   qualified_table_names, " +
             "   source_cluster_name," +
             "   source_cluster_desc," +
             "   target_cluster_name," +
@@ -92,7 +91,7 @@ public class DBService implements Closeable {
         jobResultStatement = session.prepare(String.format(
             " SELECT " +
             "   job_id," +
-            "   table_name, " +
+            "   qualified_table_name, " +
             "   matched_partitions," +
             "   mismatched_partitions,"   +
             "   matched_rows,"   +
@@ -102,12 +101,12 @@ public class DBService implements Closeable {
             "   partitions_only_in_target,"   +
             "   skipped_partitions"   +
             " FROM %s.job_results" +
-            " WHERE job_id = ? AND table_name = ?", diffKeyspace));
+            " WHERE job_id = ? AND qualified_table_name = ?", diffKeyspace));
         jobStatusStatement = session.prepare(String.format(
             " SELECT " +
             "   job_id,"   +
             "   bucket,"   +
-            "   table_name,"   +
+            "   qualified_table_name,"   +
             "   completed "   +
             " FROM %s.job_status" +
             " WHERE job_id = ? AND bucket = ?", diffKeyspace));
@@ -115,7 +114,7 @@ public class DBService implements Closeable {
             " SELECT " +
             "   job_id," +
             "   bucket," +
-            "   table_name," +
+            "   qualified_table_name," +
             "   mismatching_token," +
             "   mismatch_type" +
             " FROM %s.mismatches" +
@@ -123,14 +122,14 @@ public class DBService implements Closeable {
         jobErrorSummaryStatement = session.prepare(String.format(
             " SELECT " +
             "   count(start_token) AS error_count," +
-            "   table_name" +
+            "   qualified_table_name" +
             " FROM %s.task_errors" +
             " WHERE job_id = ? AND bucket = ?",
             diffKeyspace));
         jobErrorRangesStatement = session.prepare(String.format(
             " SELECT " +
             "   bucket,"   +
-            "   table_name,"   +
+            "   qualified_table_name,"   +
             "   start_token,"   +
             "   end_token"   +
             " FROM %s.task_errors" +
@@ -138,10 +137,10 @@ public class DBService implements Closeable {
             diffKeyspace));
         jobErrorDetailStatement = session.prepare(String.format(
             " SELECT " +
-            "   table_name,"   +
+            "   qualified_table_name,"   +
             "   error_token"   +
             " FROM %s.partition_errors" +
-            " WHERE job_id = ? AND bucket = ? AND table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
+            " WHERE job_id = ? AND bucket = ? AND qualified_table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
         jobsStartDateStatement = session.prepare(String.format(
             " SELECT " +
             "   job_id" +
@@ -190,8 +189,8 @@ public class DBService implements Closeable {
 
     public Collection<JobResult> fetchJobResults(UUID jobId) {
         JobSummary summary = fetchJobSummary(jobId);
-        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.tables.size());
-        for (String table : summary.tables)
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.keyspaceTables.size());
+        for (String table : summary.keyspaceTables)
             futures.add(session.executeAsync(jobResultStatement.bind(jobId, table)));
 
         SortedSet<JobResult> results = Sets.newTreeSet();
@@ -206,8 +205,8 @@ public class DBService implements Closeable {
         for (int i = 0; i < summary.buckets; i++)
             futures.add(session.executeAsync(jobStatusStatement.bind(jobId, i)));
 
-        Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
-        processFutures(futures, row -> completedByTable.merge(row.getString("table_name"),
+        Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+        processFutures(futures, row -> completedByTable.merge(row.getString("qualified_table_name"),
                                                               row.getLong("completed"),
                                                               Long::sum));
         return new JobStatus(jobId, completedByTable);
@@ -220,8 +219,8 @@ public class DBService implements Closeable {
         for (int i = 0; i < summary.buckets; i++)
             futures.add(session.executeAsync(jobMismatchesStatement.bind(jobId, i)));
 
-        Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
-        processFutures(futures, row -> mismatchesByTable.merge(row.getString("table_name"),
+        Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+        processFutures(futures, row -> mismatchesByTable.merge(row.getString("qualified_table_name"),
                                                                Lists.newArrayList(new Mismatch(row.getString("mismatching_token"),
                                                                                                row.getString("mismatch_type"))),
                                                                (l1, l2) -> { l1.addAll(l2); return l1;}));
@@ -235,11 +234,11 @@ public class DBService implements Closeable {
         for (int i = 0; i < summary.buckets; i++)
             futures.add(session.executeAsync(jobErrorSummaryStatement.bind(jobId, i)));
 
-        Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
         processFutures(futures, row -> {
-            String table = row.getString("table_name");
+            String table = row.getString("qualified_table_name");
             if (null != table) {
-                errorCountByTable.merge(row.getString("table_name"),
+                errorCountByTable.merge(row.getString("qualified_table_name"),
                                         row.getLong("error_count"),
                                         Long::sum);
             }
@@ -254,8 +253,8 @@ public class DBService implements Closeable {
         for (int i = 0; i < summary.buckets; i++)
             futures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
 
-        Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
-        processFutures(futures, row -> errorRangesByTable.merge(row.getString("table_name"),
+        Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+        processFutures(futures, row -> errorRangesByTable.merge(row.getString("qualified_table_name"),
                                                                 Lists.newArrayList(new Range(row.getString("start_token"),
                                                                                              row.getString("end_token"))),
                                                                 (l1, l2) -> { l1.addAll(l2); return l1;}));
@@ -273,13 +272,13 @@ public class DBService implements Closeable {
         processFutures(rangeFutures,
                        row -> session.executeAsync(jobErrorDetailStatement.bind(jobId,
                                                                                 row.getInt("bucket"),
-                                                                                row.getString("table_name"),
+                                                                                row.getString("qualified_table_name"),
                                                                                 row.getString("start_token"),
                                                                                 row.getString("end_token"))),
                        errorFutures::add);
-        Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
         processFutures(errorFutures,
-                       row -> errorsByTable.merge(row.getString("table_name"),
+                       row -> errorsByTable.merge(row.getString("qualified_table_name"),
                                                   Lists.newArrayList(row.getString("error_token")),
                                                   (l1, l2) -> { l1.addAll(l2); return l1;}));
         return new JobErrorDetail(jobId, errorsByTable);
@@ -472,7 +471,7 @@ public class DBService implements Closeable {
 
         static JobResult fromRow(Row row) {
             return new JobResult(row.getUUID("job_id"),
-                                 row.getString("table_name"),
+                                 row.getString("qualified_table_name"),
                                  row.getLong("matched_partitions"),
                                  row.getLong("mismatched_partitions"),
                                  row.getLong("matched_rows"),
@@ -494,8 +493,7 @@ public class DBService implements Closeable {
 
         final UUID jobId;
         final int buckets;
-        final String keyspace;
-        final List<String> tables;
+        final List<String> keyspaceTables;
         final String sourceClusterName;
         final String sourceClusterDesc;
         final String targetClusterName;
@@ -509,8 +507,7 @@ public class DBService implements Closeable {
         private JobSummary(UUID jobId,
                            DateTime startTime,
                            int buckets,
-                           String keyspace,
-                           List<String> tables,
+                           List<String> keyspaceTables,
                            String sourceClusterName,
                            String sourceClusterDesc,
                            String targetClusterName,
@@ -521,8 +518,7 @@ public class DBService implements Closeable {
             this.startTime = startTime;
             this.start = startTime.toString();
             this.buckets = buckets;
-            this.keyspace = keyspace;
-            this.tables = tables;
+            this.keyspaceTables = keyspaceTables;
             this.sourceClusterName = sourceClusterName;
             this.sourceClusterDesc = sourceClusterDesc;
             this.targetClusterName = targetClusterName;
@@ -534,8 +530,7 @@ public class DBService implements Closeable {
             return new JobSummary(row.getUUID("job_id"),
                                   new DateTime(UUIDs.unixTimestamp(row.getUUID("job_start_time")), DateTimeZone.UTC),
                                   row.getInt("buckets"),
-                                  row.getString("keyspace_name"),
-                                  row.getList("table_names", String.class),
+                                  row.getList("qualified_table_names", String.class),
                                   row.getString("source_cluster_name"),
                                   row.getString("source_cluster_desc"),
                                   row.getString("target_cluster_name"),
diff --git a/common/pom.xml b/common/pom.xml
index a25cd10..d877ff7 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -49,6 +49,13 @@
             <artifactId>cassandra-driver-core</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
index f0cbb36..2d6cb51 100644
--- a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -26,9 +26,7 @@ import java.util.Optional;
 import java.util.UUID;
 
 public interface JobConfiguration extends Serializable {
-    String keyspace();
-
-    List<String> tables();
+    List<KeyspaceTablePair> keyspaceTables();
 
     int splits();
 
diff --git a/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
new file mode 100644
index 0000000..e705d3c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
@@ -0,0 +1,61 @@
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+import com.datastax.driver.core.TableMetadata;
+
+public final class KeyspaceTablePair implements Serializable {
+    public final String keyspace;
+    public final String table;
+
+    public static KeyspaceTablePair from(TableMetadata tableMetadata) {
+        return new KeyspaceTablePair(tableMetadata.getKeyspace().getName(), tableMetadata.getName());
+    }
+
+    // Used by Yaml loader
+    public KeyspaceTablePair(String input) {
+        String[] parts = input.trim().split("\\.");
+        assert parts.length == 2 : "Invalid keyspace table pair format";
+        assert parts[0].length() > 0;
+        assert parts[1].length() > 0;
+
+        this.keyspace = parts[0];
+        this.table = parts[1];
+    }
+
+    public KeyspaceTablePair(String keyspace, String table) {
+        this.keyspace = keyspace;
+        this.table = table;
+    }
+
+    public String toCqlValueString() {
+        return String.format("%s.%s", keyspace, table);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                          .add("keyspace", keyspace)
+                          .add("table", table)
+                          .toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(keyspace, table);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null || getClass() != obj.getClass())
+            return false;
+        KeyspaceTablePair that = (KeyspaceTablePair) obj;
+        return Objects.equals(keyspace, that.keyspace)
+               && Objects.equals(table, that.table);
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
index 69fc28c..3666e48 100644
--- a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -34,8 +34,7 @@ import org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
 
 public class YamlJobConfiguration implements JobConfiguration {
     public int splits = 10000;
-    public String keyspace;
-    public List<String> tables;
+    public List<KeyspaceTablePair> keyspace_tables;
     public int buckets = 100;
     public int rate_limit = 10000;
     public String job_id = null;
@@ -59,12 +58,8 @@ public class YamlJobConfiguration implements JobConfiguration {
         }
     }
 
-    public String keyspace() {
-        return keyspace;
-    }
-
-    public List<String> tables() {
-        return tables;
+    public List<KeyspaceTablePair> keyspaceTables() {
+        return keyspace_tables;
     }
 
     public int splits() {
@@ -127,8 +122,7 @@ public class YamlJobConfiguration implements JobConfiguration {
     public String toString() {
         return "YamlJobConfiguration{" +
                "splits=" + splits +
-               ", keyspace='" + keyspace + '\'' +
-               ", tables=" + tables +
+               ", keyspace_tables=" + keyspace_tables +
                ", buckets=" + buckets +
                ", rate_limit=" + rate_limit +
                ", job_id='" + job_id + '\'' +
diff --git a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
new file mode 100644
index 0000000..813f38a
--- /dev/null
+++ b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.diff;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class YamlJobConfigurationTest {
+    @Test
+    public void testLoadYaml() {
+        JobConfiguration jobConfiguration = YamlJobConfiguration.load("src/test/resources/testconfig.yaml");
+        Assert.assertEquals(3, jobConfiguration.keyspaceTables().size());
+        jobConfiguration.keyspaceTables().forEach(kt -> {
+            Assert.assertTrue("Keyspace segment is not loaded correctly", kt.keyspace.contains("ks"));
+            Assert.assertTrue("Table segment is not loaded correctly", kt.table.contains("tb"));
+        });
+    }
+}
diff --git a/spark-job/localconfig.yaml b/common/src/test/resources/testconfig.yaml
similarity index 94%
copy from spark-job/localconfig.yaml
copy to common/src/test/resources/testconfig.yaml
index d4f5630..a860e48 100644
--- a/spark-job/localconfig.yaml
+++ b/common/src/test/resources/testconfig.yaml
@@ -1,8 +1,8 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
-  - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+  - ks1.tb1
+  - ks1.tb2
+  - ks2.tb3
 
 # This is how many parts we split the full token range in.
 # Each of these splits is then compared between the clusters
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig-multi-keyspaces.yaml
similarity index 93%
copy from spark-job/localconfig.yaml
copy to spark-job/localconfig-multi-keyspaces.yaml
index d4f5630..5b18627 100644
--- a/spark-job/localconfig.yaml
+++ b/spark-job/localconfig-multi-keyspaces.yaml
@@ -1,8 +1,7 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
-  - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+  - keyspace1.standard1
+  - keyspace2.standard1
 
 # This is how many parts we split the full token range in.
 # Each of these splits is then compared between the clusters
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig.yaml
index d4f5630..8271ebd 100644
--- a/spark-job/localconfig.yaml
+++ b/spark-job/localconfig.yaml
@@ -1,8 +1,6 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
-  - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+  - keyspace1.standard1
 
 # This is how many parts we split the full token range in.
 # Each of these splits is then compared between the clusters
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
index bdd6488..b86cf69 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
@@ -19,11 +19,19 @@
 
 package org.apache.cassandra.diff;
 
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
index 4bbd81c..a60b8f7 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
@@ -20,22 +20,48 @@
 package org.apache.cassandra.diff;
 
 import java.math.BigInteger;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.util.concurrent.*;
-
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.querybuilder.*;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ClusteringOrder;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import com.datastax.driver.core.querybuilder.Ordering;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
 import org.jetbrains.annotations.NotNull;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.desc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.gt;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.lte;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.token;
 import static org.apache.cassandra.diff.DiffContext.cqlizedString;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
 
 public class DiffCluster implements AutoCloseable
 {
@@ -43,12 +69,11 @@ public class DiffCluster implements AutoCloseable
 
     public enum Type {SOURCE, TARGET}
 
-    private final Map<String, PreparedStatement[]> preparedStatements = new HashMap<>();
+    private final Map<KeyspaceTablePair, PreparedStatement[]> preparedStatements = new HashMap<>();
     private final ConsistencyLevel consistencyLevel;
     public final Cluster cluster;
     private final Session session;
     private final TokenHelper tokenHelper;
-    public final String keyspace;
     public final List<BigInteger> tokenList;
 
     public final RateLimiter getPartitionRateLimiter;
@@ -61,7 +86,6 @@ public class DiffCluster implements AutoCloseable
 
     public DiffCluster(Type clusterId,
                        Cluster cluster,
-                       String keyspace,
                        ConsistencyLevel consistencyLevel,
                        RateLimiter getPartitionRateLimiter,
                        int tokenScanFetchSize,
@@ -69,7 +93,6 @@ public class DiffCluster implements AutoCloseable
                        int readTimeoutMillis)
 
     {
-        this.keyspace = keyspace;
         this.consistencyLevel = consistencyLevel;
         this.cluster = cluster;
         this.tokenHelper = TokenHelper.forPartitioner(cluster.getMetadata().getPartitioner());
@@ -82,7 +105,7 @@ public class DiffCluster implements AutoCloseable
         this.readTimeoutMillis = readTimeoutMillis;
     }
 
-    public Iterator<PartitionKey> getPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+    public Iterator<PartitionKey> getPartitionKeys(KeyspaceTablePair table, final BigInteger prevToken, final BigInteger token) {
         try {
             return Uninterruptibles.getUninterruptibly(fetchPartitionKeys(table, prevToken, token));
         }
@@ -93,7 +116,7 @@ public class DiffCluster implements AutoCloseable
         }
     }
 
-    private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+    private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(KeyspaceTablePair table, final BigInteger prevToken, final BigInteger token) {
         BoundStatement statement = keyReader(table).bind(tokenHelper.forBindParam(prevToken),
                                                          tokenHelper.forBindParam(token));
         statement.setFetchSize(tokenScanFetchSize);
@@ -132,10 +155,10 @@ public class DiffCluster implements AutoCloseable
         }
     }
 
-    private ResultSetFuture readPartition(String table, final PartitionKey key, boolean shouldReverse) {
+    private ResultSetFuture readPartition(KeyspaceTablePair keyspaceTablePair, final PartitionKey key, boolean shouldReverse) {
         BoundStatement statement = shouldReverse
-                                   ? reverseReader(table).bind(key.getComponents().toArray())
-                                   : forwardReader(table).bind(key.getComponents().toArray());
+                                   ? reverseReader(keyspaceTablePair).bind(key.getComponents().toArray())
+                                   : forwardReader(keyspaceTablePair).bind(key.getComponents().toArray());
         statement.setFetchSize(partitionReadFetchSize);
         statement.setReadTimeoutMillis(readTimeoutMillis);
         getPartitionRateLimiter.acquire();
@@ -153,38 +176,38 @@ public class DiffCluster implements AutoCloseable
         cluster.closeAsync();
     }
 
-    private PreparedStatement keyReader(String table) {
-        return getStatementForTable(table, 0);
+    private PreparedStatement keyReader(KeyspaceTablePair keyspaceTablePair) {
+        return getStatementForTable(keyspaceTablePair, 0);
     }
 
-    private PreparedStatement forwardReader(String table) {
-        return getStatementForTable(table, 1);
+    private PreparedStatement forwardReader(KeyspaceTablePair keyspaceTablePair) {
+        return getStatementForTable(keyspaceTablePair, 1);
     }
 
-    private PreparedStatement reverseReader(String table) {
-        return getStatementForTable(table, 2);
+    private PreparedStatement reverseReader(KeyspaceTablePair keyspaceTablePair) {
+        return getStatementForTable(keyspaceTablePair, 2);
     }
 
-    private PreparedStatement getStatementForTable(String table, int index) {
-        if (!preparedStatements.containsKey(table)) {
+    private PreparedStatement getStatementForTable(KeyspaceTablePair keyspaceTablePair, int index) {
+        if (!preparedStatements.containsKey(keyspaceTablePair)) {
             synchronized (this) {
-                if (!preparedStatements.containsKey(table)) {
-                    PreparedStatement keyStatement = getKeyStatement(table);
-                    PreparedStatement[] partitionReadStmts = getFullStatement(table);
-                    preparedStatements.put(table, new PreparedStatement[]{ keyStatement ,
-                                                                           partitionReadStmts[0],
-                                                                           partitionReadStmts[1] });
+                if (!preparedStatements.containsKey(keyspaceTablePair)) {
+                    PreparedStatement keyStatement = getKeyStatement(keyspaceTablePair);
+                    PreparedStatement[] partitionReadStmts = getFullStatement(keyspaceTablePair);
+                    preparedStatements.put(keyspaceTablePair, new PreparedStatement[]{ keyStatement ,
+                                                                                       partitionReadStmts[0],
+                                                                                       partitionReadStmts[1] });
                 }
             }
         }
-        return preparedStatements.get(table)[index];
+        return preparedStatements.get(keyspaceTablePair)[index];
     }
 
-    private PreparedStatement getKeyStatement(@NotNull String table) {
+    private PreparedStatement getKeyStatement(@NotNull KeyspaceTablePair keyspaceTablePair) {
         final TableMetadata tableMetadata = session.getCluster()
                                                    .getMetadata()
-                                                   .getKeyspace(cqlizedString(keyspace))
-                                                   .getTable(cqlizedString(table));
+                                                   .getKeyspace(cqlizedString(keyspaceTablePair.keyspace))
+                                                   .getTable(cqlizedString(keyspaceTablePair.table));
         String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
 
         Select.Selection selection = QueryBuilder.select().distinct().column(token(partitionKeyColumns));
@@ -199,11 +222,11 @@ public class DiffCluster implements AutoCloseable
         return session.prepare(select).setConsistencyLevel(consistencyLevel);
     }
 
-    private PreparedStatement[] getFullStatement(@NotNull String table) {
+    private PreparedStatement[] getFullStatement(@NotNull KeyspaceTablePair keyspaceTablePair) {
         final TableMetadata tableMetadata = session.getCluster()
                                                    .getMetadata()
-                                                   .getKeyspace(cqlizedString(keyspace))
-                                                   .getTable(cqlizedString(table));
+                                                   .getKeyspace(cqlizedString(keyspaceTablePair.keyspace))
+                                                   .getTable(cqlizedString(keyspaceTablePair.table));
         String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
         String[] allColumns = columnNames(tableMetadata.getColumns());
 
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index 3047c97..4a1faa1 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -21,14 +21,17 @@ package org.apache.cassandra.diff;
 
 import java.io.Serializable;
 import java.math.BigInteger;
-import java.util.*;
-import java.util.function.BiConsumer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,13 +70,13 @@ public class DiffJob {
     // optional code block to run before a job starts
     private Runnable preJobHook;
     // optional code block to run after a job completes successfully; otherwise, it is not executed.
-    private Consumer<Map<String, RangeStats>> postJobHook;
+    private Consumer<Map<KeyspaceTablePair, RangeStats>> postJobHook;
 
     public void addPreJobHook(Runnable preJobHook) {
         this.preJobHook = preJobHook;
     }
 
-    public void addPostJobHook(Consumer<Map<String, RangeStats>> postJobHook) {
+    public void addPostJobHook(Consumer<Map<KeyspaceTablePair, RangeStats>> postJobHook) {
         this.postJobHook = postJobHook;
     }
 
@@ -132,10 +135,9 @@ public class DiffJob {
                               targetProvider.getClusterName(),
                               targetProvider.toString());
 
-            logger.info("DiffJob {} comparing [{}] in keyspace {} on {} and {}",
+            logger.info("DiffJob {} comparing [{}] on {} and {}",
                         jobId,
-                        String.join(",", params.tables),
-                        params.keyspace,
+                        params.keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")),
                         sourceProvider,
                         targetProvider);
 
@@ -143,18 +145,18 @@ public class DiffJob {
                 preJobHook.run();
 
             // Run the distributed diff and collate results
-            Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
-                                                  .map((split) -> new Differ(configuration,
-                                                                             params,
-                                                                             perExecutorRateLimit,
-                                                                             split,
-                                                                             tokenHelper,
-                                                                             sourceProvider,
-                                                                             targetProvider,
-                                                                             metadataProvider,
-                                                                             new TrackerProvider(configuration.metadataOptions().keyspace))
-                                                                  .run())
-                                                  .reduce(Differ::accumulate);
+            Map<KeyspaceTablePair, RangeStats> diffStats = sc.parallelize(splits, slices)
+                                                             .map((split) -> new Differ(configuration,
+                                                                                        params,
+                                                                                        perExecutorRateLimit,
+                                                                                        split,
+                                                                                        tokenHelper,
+                                                                                        sourceProvider,
+                                                                                        targetProvider,
+                                                                                        metadataProvider,
+                                                                                        new TrackerProvider(configuration.metadataOptions().keyspace))
+                                                                                 .run())
+                                                             .reduce(Differ::accumulate);
             // Publish results. This also removes the job from the currently running list
             job.finalizeJob(params.jobId, diffStats);
             logger.info("FINISHED: {}", diffStats);
@@ -181,8 +183,7 @@ public class DiffJob {
             return job.getJobParams(conf.jobId().get());
         } else {
             return new Params(UUID.randomUUID(),
-                              conf.keyspace(),
-                              conf.tables(),
+                              conf.keyspaceTables(),
                               conf.buckets(),
                               conf.splits());
         }
@@ -264,22 +265,20 @@ public class DiffJob {
 
     static class Params implements Serializable {
         public final UUID jobId;
-        public final String keyspace;
-        public final ImmutableList<String> tables;
+        public final ImmutableList<KeyspaceTablePair> keyspaceTables;
         public final int buckets;
         public final int tasks;
 
-        Params(UUID jobId, String keyspace, List<String> tables, int buckets, int tasks) {
+        Params(UUID jobId, List<KeyspaceTablePair> keyspaceTables, int buckets, int tasks) {
             this.jobId = jobId;
-            this.keyspace = keyspace;
-            this.tables = ImmutableList.copyOf(tables);
+            this.keyspaceTables = ImmutableList.copyOf(keyspaceTables);
             this.buckets = buckets;
             this.tasks = tasks;
         }
 
         public String toString() {
-            return String.format("Params: [jobId: %s, keyspace: %s, tables: %s, buckets: %s, tasks: %s]",
-                                 jobId, keyspace, tables.stream().collect(Collectors.joining(",")), buckets, tasks);
+            return String.format("Params: [jobId: %s, keyspaceTables: %s, buckets: %s, tasks: %s]",
+                                 jobId, keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")), buckets, tasks);
         }
     }
 
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index 2272b44..f7545e3 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -23,23 +23,25 @@ import java.io.PrintWriter;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.math.BigInteger;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-import java.util.function.BiConsumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Verify;
-import com.google.common.util.concurrent.*;
-
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
 
 public class Differ implements Serializable
 {
@@ -53,8 +55,7 @@ public class Differ implements Serializable
     private final UUID jobId;
     private final DiffJob.Split split;
     private final TokenHelper tokenHelper;
-    private final String keyspace;
-    private final List<String> tables;
+    private final List<KeyspaceTablePair> keyspaceTables;
     private final RateLimiter rateLimiter;
     private final DiffJob.TrackerProvider trackerProvider;
     private final double reverseReadProbability;
@@ -92,8 +93,7 @@ public class Differ implements Serializable
         this.jobId = params.jobId;
         this.split = split;
         this.tokenHelper = tokenHelper;
-        this.keyspace = params.keyspace;
-        this.tables = params.tables;
+        this.keyspaceTables = params.keyspaceTables;
         this.trackerProvider = trackerProvider;
         rateLimiter = RateLimiter.create(perExecutorRateLimit);
         this.reverseReadProbability = config.reverseReadProbability();
@@ -110,7 +110,6 @@ public class Differ implements Serializable
             {
                 srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
                                                  sourceProvider.getCluster(),
-                                                 params.keyspace,
                                                  cl,
                                                  rateLimiter,
                                                  config.tokenScanFetchSize(),
@@ -122,7 +121,6 @@ public class Differ implements Serializable
             {
                 targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
                                                     targetProvider.getCluster(),
-                                                    params.keyspace,
                                                     cl,
                                                     rateLimiter,
                                                     config.tokenScanFetchSize(),
@@ -138,18 +136,19 @@ public class Differ implements Serializable
         }
     }
 
-    public Map<String, RangeStats> run() {
+    public Map<KeyspaceTablePair, RangeStats> run() {
         JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);
-        Map<String, DiffJob.TaskStatus> tablesToDiff = filterTables(tables,
-                                                                    split,
-                                                                    journal::getLastStatus,
-                                                                    !specificTokens.isEmpty());
 
-        String metricsPrefix = String.format("%s.%s", srcDiffCluster.clusterId.name(), srcDiffCluster.keyspace);
+        Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToDiff = filterTables(keyspaceTables,
+                                                                               split,
+                                                                               journal::getLastStatus,
+                                                                               !specificTokens.isEmpty());
+
+        String metricsPrefix = srcDiffCluster.clusterId.name();
         logger.info("Diffing {} for tables {}", split, tablesToDiff);
 
-        for (Map.Entry<String, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
-            final String table = tableStatus.getKey();
+        for (Map.Entry<KeyspaceTablePair, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
+            final KeyspaceTablePair keyspaceTablePair = tableStatus.getKey();
             DiffJob.TaskStatus status = tableStatus.getValue();
             RangeStats diffStats = status.stats;
 
@@ -160,37 +159,37 @@ public class Differ implements Serializable
             BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
             validateRange(startToken, split.end, tokenHelper);
 
-            TableSpec sourceTable = TableSpec.make(table, srcDiffCluster);
-            TableSpec targetTable = TableSpec.make(table, targetDiffCluster);
+            TableSpec sourceTable = TableSpec.make(keyspaceTablePair, srcDiffCluster);
+            TableSpec targetTable = TableSpec.make(keyspaceTablePair, targetDiffCluster);
             validateTableSpecs(sourceTable, targetTable);
 
             DiffContext ctx = new DiffContext(srcDiffCluster,
                                               targetDiffCluster,
-                                              keyspace,
+                                              keyspaceTablePair.keyspace,
                                               sourceTable,
                                               startToken,
                                               split.end,
                                               specificTokens,
                                               reverseReadProbability);
 
-            String timerName = String.format("%s.%s.split_times", metricsPrefix, table);
+            String timerName = String.format("%s.%s.split_times", metricsPrefix, keyspaceTablePair.table);
             try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
                 diffStats.accumulate(diffTable(ctx,
-                                               (error, token) -> journal.recordError(table, token, error),
-                                               (type, token) -> journal.recordMismatch(table, type, token),
-                                               (stats, token) -> journal.updateStatus(table, stats, token)));
+                                               (error, token) -> journal.recordError(keyspaceTablePair, token, error),
+                                               (type, token) -> journal.recordMismatch(keyspaceTablePair, type, token),
+                                               (stats, token) -> journal.updateStatus(keyspaceTablePair, stats, token)));
 
                 // update the journal with the final state for the table. Use the split's ending token
                 // as the last seen token (even though we may not have actually read any partition for
                 // that token) as this effectively marks the split as done.
-                journal.finishTable(table, diffStats, !isRerun);
+                journal.finishTable(keyspaceTablePair, diffStats, !isRerun);
             }
         }
 
-        Map<String, RangeStats> statsByTable = tablesToDiff.entrySet()
-                                                           .stream()
-                                                           .collect(Collectors.toMap(Map.Entry::getKey,
-                                                                                     e -> e.getValue().stats));
+        Map<KeyspaceTablePair, RangeStats> statsByTable = tablesToDiff.entrySet()
+                                                                      .stream()
+                                                                      .collect(Collectors.toMap(Map.Entry::getKey,
+                                                                                                e -> e.getValue().stats));
         updateMetrics(metricsPrefix, statsByTable);
         return statsByTable;
     }
@@ -226,25 +225,25 @@ public class Differ implements Serializable
     }
 
     @VisibleForTesting
-    static Map<String, DiffJob.TaskStatus> filterTables(Iterable<String> tables,
-                                                        DiffJob.Split split,
-                                                        Function<String, DiffJob.TaskStatus> journal,
-                                                        boolean includeCompleted) {
-        Map<String, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
-        for (String table : tables) {
-            DiffJob.TaskStatus taskStatus = journal.apply(table);
+    static Map<KeyspaceTablePair, DiffJob.TaskStatus> filterTables(Iterable<KeyspaceTablePair> keyspaceTables,
+                                                                   DiffJob.Split split,
+                                                                   Function<KeyspaceTablePair, DiffJob.TaskStatus> journal,
+                                                                   boolean includeCompleted) {
+        Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
+        for (KeyspaceTablePair pair : keyspaceTables) {
+            DiffJob.TaskStatus taskStatus = journal.apply(pair);
             RangeStats diffStats = taskStatus.stats;
             BigInteger lastToken = taskStatus.lastToken;
 
             // When we finish processing a split for a given table, we update the task status in journal
             // to set the last seen token to the split's end token, to indicate that the split is complete.
             if (!includeCompleted && lastToken != null && lastToken.equals(split.end)) {
-                logger.info("Found finished table {} for split {}", table, split);
+                logger.info("Found finished table {} for split {}", pair, split);
             }
             else {
-                tablesToProcess.put(table, diffStats != null
-                                            ? taskStatus
-                                            : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
+                tablesToProcess.put(pair, diffStats != null
+                                          ? taskStatus
+                                          : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
             }
         }
         return tablesToProcess;
@@ -267,9 +266,9 @@ public class Differ implements Serializable
     }
 
     @VisibleForTesting
-    static Map<String, RangeStats> accumulate(Map<String, RangeStats> stats, Map<String, RangeStats> otherStats)
+    static Map<KeyspaceTablePair, RangeStats> accumulate(Map<KeyspaceTablePair, RangeStats> stats, Map<KeyspaceTablePair, RangeStats> otherStats)
     {
-        for (Map.Entry<String, RangeStats> otherEntry : otherStats.entrySet())
+        for (Map.Entry<KeyspaceTablePair, RangeStats> otherEntry : otherStats.entrySet())
         {
             if (stats.containsKey(otherEntry.getKey()))
                 stats.get(otherEntry.getKey()).accumulate(otherEntry.getValue());
@@ -279,11 +278,12 @@ public class Differ implements Serializable
         return stats;
     }
 
-    private static void updateMetrics(String prefix, Map<String, RangeStats> statsMap)
+    private static void updateMetrics(String prefix, Map<KeyspaceTablePair, RangeStats> statsMap)
     {
-        for (Map.Entry<String, RangeStats> entry : statsMap.entrySet())
+        for (Map.Entry<KeyspaceTablePair, RangeStats> entry : statsMap.entrySet())
         {
-            String qualifier = String.format("%s.%s", prefix, entry.getKey());
+            KeyspaceTablePair keyspaceTablePair = entry.getKey();
+            String qualifier = String.format("%s.%s.%s", prefix, keyspaceTablePair.keyspace, keyspaceTablePair.table);
             RangeStats stats = entry.getValue();
 
             metrics.meter(qualifier + ".partitions_read").mark(stats.getMatchedPartitions() + stats.getOnlyInSource() + stats.getOnlyInTarget() + stats.getMismatchedPartitions());
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index 1eb121c..d9d4035 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -20,13 +20,22 @@
 package org.apache.cassandra.diff;
 
 import java.math.BigInteger;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.utils.UUIDs;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -40,7 +49,7 @@ public class JobMetadataDb {
         private final int bucket;
         private final String startToken;
         private final String endToken;
-        private final String keyspace;
+        private final String metadataKeyspace;
         private Session session;
 
         private static PreparedStatement updateStmt;
@@ -53,25 +62,25 @@ public class JobMetadataDb {
                                int bucket,
                                BigInteger startToken,
                                BigInteger endToken,
-                               String keyspace,
+                               String metadataKeyspace,
                                Session session) {
             this.jobId = jobId;
             this.bucket = bucket;
             this.startToken = startToken.toString();
             this.endToken = endToken.toString();
-            this.keyspace = keyspace;
+            this.metadataKeyspace = metadataKeyspace;
             this.session = session;
         }
 
         /**
          * Runs on each executor to prepare statements shared across all instances
          */
-        public static void initializeStatements(Session session, String keyspace) {
+        public static void initializeStatements(Session session, String metadataKeyspace) {
             if (updateStmt == null) {
                 updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                            " job_id," +
                                                            " bucket," +
-                                                           " table_name," +
+                                                           " qualified_table_name," +
                                                            " start_token," +
                                                            " end_token," +
                                                            " matched_partitions," +
@@ -84,47 +93,47 @@ public class JobMetadataDb {
                                                            " skipped_partitions," +
                                                            " last_token )" +
                                                            "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
-                                                           keyspace, Schema.TASK_STATUS));
+                                                           metadataKeyspace, Schema.TASK_STATUS));
             }
             if (mismatchStmt == null) {
                 mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                              " job_id," +
                                                              " bucket," +
-                                                             " table_name," +
+                                                             " qualified_table_name," +
                                                              " mismatching_token," +
                                                              " mismatch_type )" +
                                                              "VALUES (?, ?, ?, ?, ?)",
-                                                             keyspace, Schema.MISMATCHES));
+                                                             metadataKeyspace, Schema.MISMATCHES));
             }
             if (updateCompleteStmt == null) {
                 updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
                                                                    " SET completed = completed + 1" +
                                                                    " WHERE job_id = ? " +
                                                                    " AND bucket = ? " +
-                                                                   " AND table_name = ? ",
-                                                                   keyspace, Schema.JOB_STATUS))
+                                                                   " AND qualified_table_name = ? ",
+                                                                   metadataKeyspace, Schema.JOB_STATUS))
                                             .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
             }
             if (errorSummaryStmt == null) {
                 errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                                  " job_id," +
                                                                  " bucket," +
-                                                                 " table_name," +
+                                                                 " qualified_table_name," +
                                                                  " start_token," +
                                                                  " end_token)" +
                                                                  " VALUES (?, ?, ?, ?, ?)",
-                                                                 keyspace, Schema.ERROR_SUMMARY));
+                                                                 metadataKeyspace, Schema.ERROR_SUMMARY));
             }
             if (errorDetailStmt == null) {
                 errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                                 " job_id," +
                                                                 " bucket," +
-                                                                " table_name," +
+                                                                " qualified_table_name," +
                                                                 " start_token," +
                                                                 " end_token," +
                                                                 " error_token)" +
                                                                 " VALUES (?, ?, ?, ?, ?, ?)",
-                                                                keyspace, Schema.ERROR_DETAIL));
+                                                                metadataKeyspace, Schema.ERROR_DETAIL));
             }
 
         }
@@ -140,10 +149,10 @@ public class JobMetadataDb {
 
         /**
          *
-         * @param table
+         * @param keyspaceTablePair
          * @return
          */
-        public DiffJob.TaskStatus getLastStatus(String table) {
+        public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair keyspaceTablePair) {
             ResultSet rs = session.execute(String.format("SELECT last_token, " +
                                                          "       matched_partitions, " +
                                                          "       mismatched_partitions, " +
@@ -156,11 +165,11 @@ public class JobMetadataDb {
                                                          " FROM %s.%s " +
                                                          " WHERE job_id = ? " +
                                                          " AND   bucket = ? " +
-                                                         " AND   table_name = ? " +
+                                                         " AND   qualified_table_name = ? " +
                                                          " AND   start_token = ? " +
                                                          " AND   end_token = ?",
-                                                         keyspace, Schema.TASK_STATUS),
-                                           jobId, bucket, table, startToken, endToken);
+                                                         metadataKeyspace, Schema.TASK_STATUS),
+                                           jobId, bucket, keyspaceTablePair.toCqlValueString(), startToken, endToken);
             Row row = rs.one();
             if (null == row)
                 return DiffJob.TaskStatus.EMPTY;
@@ -185,11 +194,11 @@ public class JobMetadataDb {
          * @param diffStats
          * @param latestToken
          */
-        public void updateStatus(String table, RangeStats diffStats, BigInteger latestToken) {
+        public void updateStatus(KeyspaceTablePair table, RangeStats diffStats, BigInteger latestToken) {
             session.execute(bindUpdateStatement(table, diffStats, latestToken));
         }
 
-        public void recordMismatch(String table, MismatchType type, BigInteger token) {
+        public void recordMismatch(KeyspaceTablePair table, MismatchType type, BigInteger token) {
             logger.info("Detected mismatch in table {}; partition with token {} is {}",
                         table, token, type == MismatchType.PARTITION_MISMATCH
                                       ? " different in source and target clusters"
@@ -204,7 +213,7 @@ public class JobMetadataDb {
          * @param token
          * @param error
          */
-        public void recordError(String table, BigInteger token, Throwable error) {
+        public void recordError(KeyspaceTablePair table, BigInteger token, Throwable error) {
             logger.error(String.format("Encountered error during partition comparison in table %s; " +
                                        "error for partition with token %s", table, token), error);
             BatchStatement batch = new BatchStatement();
@@ -219,41 +228,41 @@ public class JobMetadataDb {
          * @param table
          * @param stats
          */
-        public void finishTable(String table, RangeStats stats, boolean updateCompletedCount) {
+        public void finishTable(KeyspaceTablePair table, RangeStats stats, boolean updateCompletedCount) {
             logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table);
             // first flush out the last status.
             session.execute(bindUpdateStatement(table, stats, endToken));
             // then update the count of completed tasks
             if (updateCompletedCount)
-                session.execute(updateCompleteStmt.bind(jobId, bucket, table));
+                session.execute(updateCompleteStmt.bind(jobId, bucket, table.toCqlValueString()));
         }
 
-        private Statement bindMismatchesStatement(String table, BigInteger token, String type) {
-            return mismatchStmt.bind(jobId, bucket, table, token.toString(), type)
+        private Statement bindMismatchesStatement(KeyspaceTablePair table, BigInteger token, String type) {
+            return mismatchStmt.bind(jobId, bucket, table.toCqlValueString(), token.toString(), type)
                                .setIdempotent(true);
         }
 
-        private Statement bindErrorSummaryStatement(String table) {
-            return errorSummaryStmt.bind(jobId, bucket, table, startToken, endToken)
+        private Statement bindErrorSummaryStatement(KeyspaceTablePair table) {
+            return errorSummaryStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken)
                                    .setIdempotent(true);
         }
 
-        private Statement bindErrorDetailStatement(String table, BigInteger errorToken) {
-            return errorDetailStmt.bind(jobId, bucket, table, startToken, endToken, errorToken.toString())
+        private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken) {
+            return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString())
                                   .setIdempotent(true);
         }
 
-        private Statement bindUpdateStatement(String table, RangeStats stats, BigInteger token) {
+        private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, BigInteger token) {
            return bindUpdateStatement(table, stats, token.toString());
         }
 
-        private Statement bindUpdateStatement(String table, RangeStats stats, String token) {
+        private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, String token) {
             // We don't persist the partition error count from RangeStats as errors
             // are likely to be transient and not data related, so we don't want to
             // accumulate them across runs.
             return updateStmt.bind(jobId,
                                    bucket,
-                                   table,
+                                   table.toCqlValueString(),
                                    startToken,
                                    endToken,
                                    stats.getMatchedPartitions(),
@@ -275,29 +284,32 @@ public class JobMetadataDb {
 
     static class JobLifeCycle {
         final Session session;
-        final String keyspace;
+        final String metadataKeyspace;
 
         public JobLifeCycle(Session session, String metadataKeyspace) {
             this.session = session;
-            this.keyspace = metadataKeyspace;
+            this.metadataKeyspace = metadataKeyspace;
         }
 
         public DiffJob.Params getJobParams(UUID jobId) {
-            ResultSet rs = session.execute(String.format("SELECT keyspace_name, " +
-                                                         "       table_names," +
+            ResultSet rs = session.execute(String.format("SELECT qualified_table_names," +
                                                          "       buckets," +
                                                          "       total_tasks " +
                                                          "FROM %s.%s " +
                                                          "WHERE job_id = ?",
-                                                         keyspace, Schema.JOB_SUMMARY),
+                                                         metadataKeyspace, Schema.JOB_SUMMARY),
                                            jobId);
             Row row = rs.one();
             if (null == row)
                 return null;
 
+            // qualified_table_names is encoded as a List<String>. Decode it back to List<KeyspaceTablePair>.
+            List<KeyspaceTablePair> keyspaceTables = row.getList("qualified_table_names", String.class)
+                                                        .stream()
+                                                        .map(KeyspaceTablePair::new)
+                                                        .collect(Collectors.toList());;
             return new DiffJob.Params(jobId,
-                                      row.getString("keyspace_name"),
-                                      row.getList("table_names", String.class),
+                                      keyspaceTables,
                                       row.getInt("buckets"),
                                       row.getInt("total_tasks"));
         }
@@ -314,7 +326,7 @@ public class JobMetadataDb {
             // The job was previously run, so this could be a re-run to
             // mop up any failed splits so mark it in progress.
             ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
-                                                         keyspace, Schema.RUNNING_JOBS),
+                                                         metadataKeyspace, Schema.RUNNING_JOBS),
                                            params.jobId);
             if (!rs.one().getBool("[applied]")) {
                 logger.info("Aborting due to inability to mark job as running. " +
@@ -330,21 +342,19 @@ public class JobMetadataDb {
                                                " job_id," +
                                                " job_start_time," +
                                                " buckets," +
-                                               " keyspace_name," +
-                                               " table_names," +
+                                               " qualified_table_names," +
                                                " source_cluster_name," +
                                                " source_cluster_desc," +
                                                " target_cluster_name," +
                                                " target_cluster_desc," +
                                                " total_tasks)" +
-                                               " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +
+                                               " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
                                                " IF NOT EXISTS",
-                                               keyspace, Schema.JOB_SUMMARY),
+                                               metadataKeyspace, Schema.JOB_SUMMARY),
                                  params.jobId,
                                  timeUUID,
                                  params.buckets,
-                                 params.keyspace,
-                                 params.tables,
+                                 params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()),
                                  sourceClusterName,
                                  sourceClusterDesc,
                                  targetClusterName,
@@ -355,34 +365,34 @@ public class JobMetadataDb {
             if (rs.one().getBool("[applied]")) {
                 BatchStatement batch = new BatchStatement();
                 batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
-                                                            keyspace, Schema.SOURCE_CLUSTER_INDEX),
+                                                            metadataKeyspace, Schema.SOURCE_CLUSTER_INDEX),
                                               sourceClusterName, params.jobId));
                 batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
-                                                            keyspace, Schema.TARGET_CLUSTER_INDEX),
+                                                            metadataKeyspace, Schema.TARGET_CLUSTER_INDEX),
                                               targetClusterName, params.jobId));
                 batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
-                                                            keyspace, Schema.KEYSPACE_INDEX),
-                                              keyspace, params.jobId));
+                                                            metadataKeyspace, Schema.KEYSPACE_INDEX),
+                                              metadataKeyspace, params.jobId));
                 batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
                                                             "VALUES ('%s', ?, ?, ?)",
-                                                            keyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
+                                                            metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
                                               startDateTime.getHourOfDay(), timeUUID, params.jobId));
                 session.execute(batch);
             }
         }
 
-        public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {
+        public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats> results) {
             logger.info("Finalizing job status");
 
             markNotRunning(jobId);
 
             BatchStatement batch = new BatchStatement();
-            for (Map.Entry<String, RangeStats> result : results.entrySet()) {
-                String table = result.getKey();
+            for (Map.Entry<KeyspaceTablePair, RangeStats> result : results.entrySet()) {
+                KeyspaceTablePair table = result.getKey();
                 RangeStats stats = result.getValue();
                 session.execute(String.format("INSERT INTO %s.%s (" +
                                               "  job_id," +
-                                              "  table_name," +
+                                              "  qualified_table_name," +
                                               "  matched_partitions," +
                                               "  mismatched_partitions," +
                                               "  partitions_only_in_source," +
@@ -392,9 +402,9 @@ public class JobMetadataDb {
                                               "  mismatched_values," +
                                               "  skipped_partitions) " +
                                               "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
-                                              keyspace, Schema.JOB_RESULTS),
+                                              metadataKeyspace, Schema.JOB_RESULTS),
                                 jobId,
-                                table,
+                                table.toCqlValueString(),
                                 stats.getMatchedPartitions(),
                                 stats.getMismatchedPartitions(),
                                 stats.getOnlyInSource(),
@@ -414,18 +424,18 @@ public class JobMetadataDb {
                 logger.info("Marking job {} as not running", jobId);
 
                 ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
-                        keyspace, Schema.RUNNING_JOBS),
+                                                             metadataKeyspace, Schema.RUNNING_JOBS),
                         jobId);
                 if (!rs.one().getBool("[applied]"))
                 {
                     logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
-                                    "during initialization as there may be no entry for this job in the {} table",
+                                    "during initialization as there may be no entry for this job {} in the {} table",
                             jobId, Schema.RUNNING_JOBS);
                 }
             } catch (Exception e) {
                 // Because this is called from another exception handler, we don't want to lose the original exception
                 // just because we may not have been able to mark the job as not running. Just log here
-                logger.error("Could not mark job {} as not running.", e);
+                logger.error("Could not mark job {} as not running.", jobId, e);
             }
         }
     }
@@ -436,7 +446,7 @@ public class JobMetadataDb {
         private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
                                                          " job_id uuid," +
                                                          " bucket int," +
-                                                         " table_name text," +
+                                                         " qualified_table_name text," +
                                                          " start_token varchar," +
                                                          " end_token varchar," +
                                                          " matched_partitions bigint," +
@@ -448,7 +458,7 @@ public class JobMetadataDb {
                                                          " mismatched_values bigint," +
                                                          " skipped_partitions bigint," +
                                                          " last_token varchar," +
-                                                         " PRIMARY KEY((job_id, bucket), table_name, start_token, end_token))" +
+                                                         " PRIMARY KEY((job_id, bucket), qualified_table_name, start_token, end_token))" +
                                                          " WITH default_time_to_live = %s";
 
         public static final String JOB_SUMMARY = "job_summary";
@@ -456,8 +466,7 @@ public class JobMetadataDb {
                                                          " job_id uuid," +
                                                          " job_start_time timeuuid," +
                                                          " buckets int," +
-                                                         " keyspace_name text," +
-                                                         " table_names frozen<list<text>>," +
+                                                         " qualified_table_names frozen<list<text>>," +
                                                          " source_cluster_name text," +
                                                          " source_cluster_desc text," +
                                                          " target_cluster_name text," +
@@ -469,7 +478,7 @@ public class JobMetadataDb {
         public static final String JOB_RESULTS = "job_results";
         private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
                                                          " job_id uuid," +
-                                                         " table_name text," +
+                                                         " qualified_table_name text," +
                                                          " matched_partitions bigint," +
                                                          " mismatched_partitions bigint," +
                                                          " partitions_only_in_source bigint," +
@@ -478,46 +487,46 @@ public class JobMetadataDb {
                                                          " matched_values bigint," +
                                                          " mismatched_values bigint," +
                                                          " skipped_partitions bigint," +
-                                                         " PRIMARY KEY(job_id, table_name))" +
+                                                         " PRIMARY KEY(job_id, qualified_table_name))" +
                                                          " WITH default_time_to_live = %s";
 
         public static final String JOB_STATUS = "job_status";
         private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
                                                         " job_id uuid," +
                                                         " bucket int," +
-                                                        " table_name text," +
+                                                        " qualified_table_name text," +
                                                         " completed counter," +
-                                                        " PRIMARY KEY ((job_id, bucket), table_name))";
+                                                        " PRIMARY KEY ((job_id, bucket), qualified_table_name))";
 
         public static final String MISMATCHES = "mismatches";
         private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
                                                         " job_id uuid," +
                                                         " bucket int," +
-                                                        " table_name text, " +
+                                                        " qualified_table_name text, " +
                                                         " mismatching_token varchar, " +
                                                         " mismatch_type text, " +
-                                                        " PRIMARY KEY ((job_id, bucket), table_name, mismatching_token))" +
+                                                        " PRIMARY KEY ((job_id, bucket), qualified_table_name, mismatching_token))" +
                                                         " WITH default_time_to_live = %s";
 
         public static final String ERROR_SUMMARY = "task_errors";
         private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
                                                            " job_id uuid," +
                                                            " bucket int," +
-                                                           " table_name text," +
+                                                           " qualified_table_name text," +
                                                            " start_token varchar," +
                                                            " end_token varchar," +
-                                                           " PRIMARY KEY ((job_id, bucket), table_name, start_token, end_token))" +
+                                                           " PRIMARY KEY ((job_id, bucket), qualified_table_name, start_token, end_token))" +
                                                            " WITH default_time_to_live = %s";
 
         public static final String ERROR_DETAIL = "partition_errors";
         private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
                                                           " job_id uuid," +
                                                           " bucket int," +
-                                                          " table_name text," +
+                                                          " qualified_table_name text," +
                                                           " start_token varchar," +
                                                           " end_token varchar," +
                                                           " error_token varchar," +
-                                                          " PRIMARY KEY ((job_id, bucket, table_name, start_token, end_token), error_token))" +
+                                                          " PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" +
                                                           " WITH default_time_to_live = %s";
 
         public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index";
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
index 6434dc8..4214f2b 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -20,12 +20,13 @@
 package org.apache.cassandra.diff;
 
 import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Row;
 
 public class PartitionComparator implements Callable<PartitionStats> {
 
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
index d31da4f..5ed225a 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
@@ -28,7 +28,10 @@ import java.util.stream.StreamSupport;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Token;
 import org.jetbrains.annotations.NotNull;
 
 public class PartitionKey implements Comparable<PartitionKey> {
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
index 36ce2b5..bb5e937 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -20,11 +20,13 @@
 package org.apache.cassandra.diff;
 
 import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Iterator;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import com.google.common.base.Verify;
 import org.slf4j.Logger;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
index a0f8043..3aad1eb 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
@@ -19,7 +19,10 @@
 
 package org.apache.cassandra.diff;
 
-import java.io.*;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.util.Objects;
 import java.util.concurrent.atomic.LongAdder;
 
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
index d2f0963..38c131d 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
@@ -19,25 +19,30 @@
 
 package org.apache.cassandra.diff;
 
-import com.datastax.driver.core.*;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+
 import static org.apache.cassandra.diff.DiffContext.cqlizedString;
 
 public class TableSpec {
 
-    private final String table;
+    private final KeyspaceTablePair keyspaceTablePair;
     private ImmutableList<ColumnMetadata> clusteringColumns;
     private ImmutableList<ColumnMetadata> regularColumns;
 
 
-    public String getTable()
+    public KeyspaceTablePair getTable()
     {
-        return table;
+        return keyspaceTablePair;
     }
 
 
@@ -55,23 +60,23 @@ public class TableSpec {
      * @param clusteringColumns the clustering columns, retrieved from cluster using the client
      * @param regularColumns the non-primary key columns, retrieved from cluster using the client
      */
-    TableSpec(final String table,
+    TableSpec(final KeyspaceTablePair table,
               final List<ColumnMetadata> clusteringColumns,
               final List<ColumnMetadata> regularColumns) {
-        this.table = table;
+        this.keyspaceTablePair = table;
         this.clusteringColumns = ImmutableList.copyOf(clusteringColumns);
         this.regularColumns = ImmutableList.copyOf(regularColumns);
     }
 
-    public static TableSpec make(String table, DiffCluster diffCluster) {
+    public static TableSpec make(KeyspaceTablePair keyspaceTablePair, DiffCluster diffCluster) {
         final Cluster cluster = diffCluster.cluster;
 
-        final String cqlizedKeyspace = cqlizedString(diffCluster.keyspace);
-        final String cqlizedTable = cqlizedString(table);
+        final String cqlizedKeyspace = cqlizedString(keyspaceTablePair.keyspace);
+        final String cqlizedTable = cqlizedString(keyspaceTablePair.table);
 
         KeyspaceMetadata ksMetadata = cluster.getMetadata().getKeyspace(cqlizedKeyspace);
         if (ksMetadata == null) {
-            throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", diffCluster.keyspace, diffCluster.clusterId));
+            throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", keyspaceTablePair.keyspace, diffCluster.clusterId));
         }
 
         TableMetadata tableMetadata = ksMetadata.getTable(cqlizedTable);
@@ -80,11 +85,11 @@ public class TableSpec {
                                                            .stream()
                                                            .filter(c -> !(clusteringColumns.contains(c)))
                                                            .collect(Collectors.toList());
-        return new TableSpec(tableMetadata.getName(), clusteringColumns, regularColumns);
+        return new TableSpec(KeyspaceTablePair.from(tableMetadata), clusteringColumns, regularColumns);
     }
 
     public boolean equalsNamesOnly(TableSpec other) {
-        return this.table.equals(other.table)
+        return this.keyspaceTablePair.equals(other.keyspaceTablePair)
             && columnNames(this.clusteringColumns).equals(columnNames(other.clusteringColumns))
             && columnNames(this.regularColumns).equals(columnNames(other.regularColumns));
     }
@@ -101,19 +106,19 @@ public class TableSpec {
             return false;
 
         TableSpec other = (TableSpec)o;
-        return this.table.equals(other.table)
+        return this.keyspaceTablePair.equals(other.keyspaceTablePair)
                && this.clusteringColumns.equals(other.clusteringColumns)
                && this.regularColumns.equals(other.regularColumns);
 
     }
 
     public int hashCode() {
-        return Objects.hash(table, clusteringColumns, regularColumns);
+        return Objects.hash(keyspaceTablePair, clusteringColumns, regularColumns);
     }
 
     public String toString() {
         return MoreObjects.toStringHelper(this)
-                          .add("table", table)
+                          .add("table", keyspaceTablePair)
                           .add("clusteringColumns", clusteringColumns)
                           .add("regularColumns", regularColumns)
                           .toString();
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
index d8d92c7..9082970 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -24,9 +24,6 @@ import java.util.List;
 
 import org.junit.Test;
 
-import org.apache.cassandra.diff.DiffJob;
-import org.apache.cassandra.diff.TokenHelper;
-
 import static org.junit.Assert.assertEquals;
 
 public class DiffJobTest
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
index 73677d9..e588575 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
@@ -24,14 +24,8 @@ import java.util.Map;
 import java.util.function.Function;
 
 import com.google.common.base.VerifyException;
-import org.junit.Test;
-
 import com.google.common.collect.Lists;
-
-import org.apache.cassandra.diff.DiffJob;
-import org.apache.cassandra.diff.Differ;
-import org.apache.cassandra.diff.RangeStats;
-import org.apache.cassandra.diff.TokenHelper;
+import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -74,9 +68,9 @@ public class DifferTest {
         // * t2 is started and has reported some progress, but has not completed
         // * t3 has not reported any progress
         DiffJob.Split split = new DiffJob.Split(1, 1, BigInteger.ONE, BigInteger.TEN);
-        Iterable<String> tables = Lists.newArrayList("t1", "t2", "t3");
-        Function<String, DiffJob.TaskStatus> journal = (table) -> {
-            switch (table) {
+        Iterable<KeyspaceTablePair> tables = Lists.newArrayList(ksTbl("t1"), ksTbl("t2"), ksTbl("t3"));
+        Function<KeyspaceTablePair, DiffJob.TaskStatus> journal = (keyspaceTable) -> {
+            switch (keyspaceTable.table) {
                 case "t1":
                     return new DiffJob.TaskStatus(split.end, RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6));
                 case "t2":
@@ -88,24 +82,27 @@ public class DifferTest {
             }
         };
 
-        Map<String, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
+        Map<KeyspaceTablePair, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
         assertEquals(2, filtered.keySet().size());
-        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
-        assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
-        assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
-        assertNull(filtered.get("t3").lastToken);
+        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get(ksTbl("t2")).stats);
+        assertEquals(BigInteger.valueOf(5L), filtered.get(ksTbl("t2")).lastToken);
+        assertEquals(RangeStats.newStats(), filtered.get(ksTbl("t3")).stats);
+        assertNull(filtered.get(ksTbl("t3")).lastToken);
 
         // if re-running (part of) a job because of failures or problematic partitions, we want to
         // ignore the status of completed tasks and re-run them anyway as only specified tokens will
         // be processed - so t1 should be included now
         filtered = Differ.filterTables(tables, split, journal, true);
         assertEquals(3, filtered.keySet().size());
-        assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get("t1").stats);
-        assertEquals(split.end, filtered.get("t1").lastToken);
-        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
-        assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
-        assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
-        assertNull(filtered.get("t3").lastToken);
+        assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get(ksTbl("t1")).stats);
+        assertEquals(split.end, filtered.get(ksTbl("t1")).lastToken);
+        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get(ksTbl("t2")).stats);
+        assertEquals(BigInteger.valueOf(5L), filtered.get(ksTbl("t2")).lastToken);
+        assertEquals(RangeStats.newStats(), filtered.get(ksTbl("t3")).stats);
+        assertNull(filtered.get(ksTbl("t3")).lastToken);
     }
 
+    private KeyspaceTablePair ksTbl(String table) {
+        return new KeyspaceTablePair("ks", table);
+    }
 }
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
index 79b3638..9cd1892 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
@@ -33,9 +33,6 @@ import com.google.common.reflect.TypeToken;
 import org.junit.Test;
 
 import com.datastax.driver.core.*;
-import org.apache.cassandra.diff.PartitionComparator;
-import org.apache.cassandra.diff.PartitionStats;
-import org.apache.cassandra.diff.TableSpec;
 
 import static org.junit.Assert.assertEquals;
 
@@ -221,7 +218,7 @@ public class PartitionComparatorTest {
     }
 
     TableSpec spec(String table, List<String> clusteringColumns, List<String> regularColumns) {
-        return new TableSpec(table, columns(clusteringColumns), columns(regularColumns));
+        return new TableSpec(new KeyspaceTablePair("ks", table), columns(clusteringColumns), columns(regularColumns));
     }
 
     List<ColumnMetadata> columns(List<String> names) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org