You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/05/20 09:57:37 UTC
[cassandra-diff] branch master updated: Support running diff on
multiple keyspaces
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/master by this push:
new e9782c6 Support running diff on multiple keyspaces
e9782c6 is described below
commit e9782c6b59a0888e4c63248ef95468e6b176406d
Author: Yifan Cai <yi...@apple.com>
AuthorDate: Fri May 15 13:57:48 2020 -0700
Support running diff on multiple keyspaces
Patch by Yifan Cai; reviewed by marcuse for CASSANDRA-15807
closes #8
---
README.md | 17 ++-
.../cassandra/diff/api/services/DBService.java | 61 ++++----
common/pom.xml | 7 +
.../apache/cassandra/diff/JobConfiguration.java | 4 +-
.../apache/cassandra/diff/KeyspaceTablePair.java | 61 ++++++++
.../cassandra/diff/YamlJobConfiguration.java | 14 +-
.../cassandra/diff/YamlJobConfigurationTest.java | 16 ++
.../src/test/resources/testconfig.yaml | 10 +-
...onfig.yaml => localconfig-multi-keyspaces.yaml} | 9 +-
spark-job/localconfig.yaml | 8 +-
.../apache/cassandra/diff/ComparisonExecutor.java | 12 +-
.../org/apache/cassandra/diff/DiffCluster.java | 95 +++++++-----
.../java/org/apache/cassandra/diff/DiffJob.java | 57 ++++----
.../java/org/apache/cassandra/diff/Differ.java | 98 ++++++-------
.../org/apache/cassandra/diff/JobMetadataDb.java | 161 +++++++++++----------
.../apache/cassandra/diff/PartitionComparator.java | 5 +-
.../org/apache/cassandra/diff/PartitionKey.java | 5 +-
.../org/apache/cassandra/diff/RangeComparator.java | 8 +-
.../java/org/apache/cassandra/diff/RangeStats.java | 5 +-
.../java/org/apache/cassandra/diff/TableSpec.java | 37 +++--
.../org/apache/cassandra/diff/DiffJobTest.java | 3 -
.../java/org/apache/cassandra/diff/DifferTest.java | 39 +++--
.../cassandra/diff/PartitionComparatorTest.java | 5 +-
23 files changed, 427 insertions(+), 310 deletions(-)
diff --git a/README.md b/README.md
index 5fab877..c17ef59 100644
--- a/README.md
+++ b/README.md
@@ -41,9 +41,15 @@ $ cd cassandra-diff
$ mvn package
$ docker run --name cas-src -d -p 9042:9042 cassandra:3.0.18
$ docker run --name cas-tgt -d -p 9043:9042 cassandra:latest
-$ docker exec cas-src cassandra-stress write n=1k
-$ docker exec cas-tgt cassandra-stress write n=1k
+# Create 1000 rows in table "standard1" under keyspace "keyspace1"
+$ docker exec cas-src cassandra-stress write n=1k -schema keyspace="keyspace1"
+$ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace1"
+# Optionally, create another 1000 rows in table "standard1" under keyspace "keyspace2"
+$ docker exec cas-src cassandra-stress write n=1k -schema keyspace="keyspace2"
+$ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace2"
$ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig.yaml
+# If rows are created in "keyspace2", you can run pick up the localconfig-multi-keyspaces.yaml to compare data across multiple keyspaces! See the command below.
+# $ spark-submit --verbose --files ./spark-job/localconfig-multi-keyspaces.yaml --class org.apache.cassandra.diff.DiffJob spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig-multi-keyspaces.yaml
# ... logs
INFO DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 }
## start api-server:
@@ -55,9 +61,8 @@ $ curl -s localhost:8089/jobs/recent | python -mjson.tool
{
"jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
"buckets": 100,
- "keyspace": "keyspace1",
- "tables": [
- "standard1"
+ "keyspace_tables": [
+ "keyspace1.standard1"
],
"sourceClusterName": "local_test_1",
"sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
@@ -71,7 +76,7 @@ $ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | pyt
[
{
"jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
- "table": "standard1",
+ "table": "keyspace1.standard1",
"matchedPartitions": 1000,
"mismatchedPartitions": 0,
"matchedRows": 1000,
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
index 1bf1f88..0a9707a 100644
--- a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
@@ -80,8 +80,7 @@ public class DBService implements Closeable {
" job_id," +
" job_start_time," +
" buckets," +
- " keyspace_name, " +
- " table_names, " +
+ " qualified_table_names, " +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
@@ -92,7 +91,7 @@ public class DBService implements Closeable {
jobResultStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
- " table_name, " +
+ " qualified_table_name, " +
" matched_partitions," +
" mismatched_partitions," +
" matched_rows," +
@@ -102,12 +101,12 @@ public class DBService implements Closeable {
" partitions_only_in_target," +
" skipped_partitions" +
" FROM %s.job_results" +
- " WHERE job_id = ? AND table_name = ?", diffKeyspace));
+ " WHERE job_id = ? AND qualified_table_name = ?", diffKeyspace));
jobStatusStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" completed " +
" FROM %s.job_status" +
" WHERE job_id = ? AND bucket = ?", diffKeyspace));
@@ -115,7 +114,7 @@ public class DBService implements Closeable {
" SELECT " +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" mismatching_token," +
" mismatch_type" +
" FROM %s.mismatches" +
@@ -123,14 +122,14 @@ public class DBService implements Closeable {
jobErrorSummaryStatement = session.prepare(String.format(
" SELECT " +
" count(start_token) AS error_count," +
- " table_name" +
+ " qualified_table_name" +
" FROM %s.task_errors" +
" WHERE job_id = ? AND bucket = ?",
diffKeyspace));
jobErrorRangesStatement = session.prepare(String.format(
" SELECT " +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" start_token," +
" end_token" +
" FROM %s.task_errors" +
@@ -138,10 +137,10 @@ public class DBService implements Closeable {
diffKeyspace));
jobErrorDetailStatement = session.prepare(String.format(
" SELECT " +
- " table_name," +
+ " qualified_table_name," +
" error_token" +
" FROM %s.partition_errors" +
- " WHERE job_id = ? AND bucket = ? AND table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
+ " WHERE job_id = ? AND bucket = ? AND qualified_table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
jobsStartDateStatement = session.prepare(String.format(
" SELECT " +
" job_id" +
@@ -190,8 +189,8 @@ public class DBService implements Closeable {
public Collection<JobResult> fetchJobResults(UUID jobId) {
JobSummary summary = fetchJobSummary(jobId);
- List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.tables.size());
- for (String table : summary.tables)
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.keyspaceTables.size());
+ for (String table : summary.keyspaceTables)
futures.add(session.executeAsync(jobResultStatement.bind(jobId, table)));
SortedSet<JobResult> results = Sets.newTreeSet();
@@ -206,8 +205,8 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobStatusStatement.bind(jobId, i)));
- Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
- processFutures(futures, row -> completedByTable.merge(row.getString("table_name"),
+ Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+ processFutures(futures, row -> completedByTable.merge(row.getString("qualified_table_name"),
row.getLong("completed"),
Long::sum));
return new JobStatus(jobId, completedByTable);
@@ -220,8 +219,8 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobMismatchesStatement.bind(jobId, i)));
- Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
- processFutures(futures, row -> mismatchesByTable.merge(row.getString("table_name"),
+ Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+ processFutures(futures, row -> mismatchesByTable.merge(row.getString("qualified_table_name"),
Lists.newArrayList(new Mismatch(row.getString("mismatching_token"),
row.getString("mismatch_type"))),
(l1, l2) -> { l1.addAll(l2); return l1;}));
@@ -235,11 +234,11 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobErrorSummaryStatement.bind(jobId, i)));
- Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
processFutures(futures, row -> {
- String table = row.getString("table_name");
+ String table = row.getString("qualified_table_name");
if (null != table) {
- errorCountByTable.merge(row.getString("table_name"),
+ errorCountByTable.merge(row.getString("qualified_table_name"),
row.getLong("error_count"),
Long::sum);
}
@@ -254,8 +253,8 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
- Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
- processFutures(futures, row -> errorRangesByTable.merge(row.getString("table_name"),
+ Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+ processFutures(futures, row -> errorRangesByTable.merge(row.getString("qualified_table_name"),
Lists.newArrayList(new Range(row.getString("start_token"),
row.getString("end_token"))),
(l1, l2) -> { l1.addAll(l2); return l1;}));
@@ -273,13 +272,13 @@ public class DBService implements Closeable {
processFutures(rangeFutures,
row -> session.executeAsync(jobErrorDetailStatement.bind(jobId,
row.getInt("bucket"),
- row.getString("table_name"),
+ row.getString("qualified_table_name"),
row.getString("start_token"),
row.getString("end_token"))),
errorFutures::add);
- Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
processFutures(errorFutures,
- row -> errorsByTable.merge(row.getString("table_name"),
+ row -> errorsByTable.merge(row.getString("qualified_table_name"),
Lists.newArrayList(row.getString("error_token")),
(l1, l2) -> { l1.addAll(l2); return l1;}));
return new JobErrorDetail(jobId, errorsByTable);
@@ -472,7 +471,7 @@ public class DBService implements Closeable {
static JobResult fromRow(Row row) {
return new JobResult(row.getUUID("job_id"),
- row.getString("table_name"),
+ row.getString("qualified_table_name"),
row.getLong("matched_partitions"),
row.getLong("mismatched_partitions"),
row.getLong("matched_rows"),
@@ -494,8 +493,7 @@ public class DBService implements Closeable {
final UUID jobId;
final int buckets;
- final String keyspace;
- final List<String> tables;
+ final List<String> keyspaceTables;
final String sourceClusterName;
final String sourceClusterDesc;
final String targetClusterName;
@@ -509,8 +507,7 @@ public class DBService implements Closeable {
private JobSummary(UUID jobId,
DateTime startTime,
int buckets,
- String keyspace,
- List<String> tables,
+ List<String> keyspaceTables,
String sourceClusterName,
String sourceClusterDesc,
String targetClusterName,
@@ -521,8 +518,7 @@ public class DBService implements Closeable {
this.startTime = startTime;
this.start = startTime.toString();
this.buckets = buckets;
- this.keyspace = keyspace;
- this.tables = tables;
+ this.keyspaceTables = keyspaceTables;
this.sourceClusterName = sourceClusterName;
this.sourceClusterDesc = sourceClusterDesc;
this.targetClusterName = targetClusterName;
@@ -534,8 +530,7 @@ public class DBService implements Closeable {
return new JobSummary(row.getUUID("job_id"),
new DateTime(UUIDs.unixTimestamp(row.getUUID("job_start_time")), DateTimeZone.UTC),
row.getInt("buckets"),
- row.getString("keyspace_name"),
- row.getList("table_names", String.class),
+ row.getList("qualified_table_names", String.class),
row.getString("source_cluster_name"),
row.getString("source_cluster_desc"),
row.getString("target_cluster_name"),
diff --git a/common/pom.xml b/common/pom.xml
index a25cd10..d877ff7 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -49,6 +49,13 @@
<artifactId>cassandra-driver-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
index f0cbb36..2d6cb51 100644
--- a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -26,9 +26,7 @@ import java.util.Optional;
import java.util.UUID;
public interface JobConfiguration extends Serializable {
- String keyspace();
-
- List<String> tables();
+ List<KeyspaceTablePair> keyspaceTables();
int splits();
diff --git a/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
new file mode 100644
index 0000000..e705d3c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
@@ -0,0 +1,61 @@
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+import com.datastax.driver.core.TableMetadata;
+
+public final class KeyspaceTablePair implements Serializable {
+ public final String keyspace;
+ public final String table;
+
+ public static KeyspaceTablePair from(TableMetadata tableMetadata) {
+ return new KeyspaceTablePair(tableMetadata.getKeyspace().getName(), tableMetadata.getName());
+ }
+
+ // Used by Yaml loader
+ public KeyspaceTablePair(String input) {
+ String[] parts = input.trim().split("\\.");
+ assert parts.length == 2 : "Invalid keyspace table pair format";
+ assert parts[0].length() > 0;
+ assert parts[1].length() > 0;
+
+ this.keyspace = parts[0];
+ this.table = parts[1];
+ }
+
+ public KeyspaceTablePair(String keyspace, String table) {
+ this.keyspace = keyspace;
+ this.table = table;
+ }
+
+ public String toCqlValueString() {
+ return String.format("%s.%s", keyspace, table);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("keyspace", keyspace)
+ .add("table", table)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keyspace, table);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+ KeyspaceTablePair that = (KeyspaceTablePair) obj;
+ return Objects.equals(keyspace, that.keyspace)
+ && Objects.equals(table, that.table);
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
index 69fc28c..3666e48 100644
--- a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -34,8 +34,7 @@ import org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
public class YamlJobConfiguration implements JobConfiguration {
public int splits = 10000;
- public String keyspace;
- public List<String> tables;
+ public List<KeyspaceTablePair> keyspace_tables;
public int buckets = 100;
public int rate_limit = 10000;
public String job_id = null;
@@ -59,12 +58,8 @@ public class YamlJobConfiguration implements JobConfiguration {
}
}
- public String keyspace() {
- return keyspace;
- }
-
- public List<String> tables() {
- return tables;
+ public List<KeyspaceTablePair> keyspaceTables() {
+ return keyspace_tables;
}
public int splits() {
@@ -127,8 +122,7 @@ public class YamlJobConfiguration implements JobConfiguration {
public String toString() {
return "YamlJobConfiguration{" +
"splits=" + splits +
- ", keyspace='" + keyspace + '\'' +
- ", tables=" + tables +
+ ", keyspace_tables=" + keyspace_tables +
", buckets=" + buckets +
", rate_limit=" + rate_limit +
", job_id='" + job_id + '\'' +
diff --git a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
new file mode 100644
index 0000000..813f38a
--- /dev/null
+++ b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.diff;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class YamlJobConfigurationTest {
+ @Test
+ public void testLoadYaml() {
+ JobConfiguration jobConfiguration = YamlJobConfiguration.load("src/test/resources/testconfig.yaml");
+ Assert.assertEquals(3, jobConfiguration.keyspaceTables().size());
+ jobConfiguration.keyspaceTables().forEach(kt -> {
+ Assert.assertTrue("Keyspace segment is not loaded correctly", kt.keyspace.contains("ks"));
+ Assert.assertTrue("Table segment is not loaded correctly", kt.table.contains("tb"));
+ });
+ }
+}
diff --git a/spark-job/localconfig.yaml b/common/src/test/resources/testconfig.yaml
similarity index 94%
copy from spark-job/localconfig.yaml
copy to common/src/test/resources/testconfig.yaml
index d4f5630..a860e48 100644
--- a/spark-job/localconfig.yaml
+++ b/common/src/test/resources/testconfig.yaml
@@ -1,8 +1,8 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
- - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+ - ks1.tb1
+ - ks1.tb2
+ - ks2.tb3
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig-multi-keyspaces.yaml
similarity index 93%
copy from spark-job/localconfig.yaml
copy to spark-job/localconfig-multi-keyspaces.yaml
index d4f5630..5b18627 100644
--- a/spark-job/localconfig.yaml
+++ b/spark-job/localconfig-multi-keyspaces.yaml
@@ -1,8 +1,7 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
- - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+ - keyspace1.standard1
+ - keyspace2.standard1
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig.yaml
index d4f5630..8271ebd 100644
--- a/spark-job/localconfig.yaml
+++ b/spark-job/localconfig.yaml
@@ -1,8 +1,6 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
- - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+ - keyspace1.standard1
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
index bdd6488..b86cf69 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
@@ -19,11 +19,19 @@
package org.apache.cassandra.diff;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
index 4bbd81c..a60b8f7 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
@@ -20,22 +20,48 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.AbstractIterator;
-import com.google.common.util.concurrent.*;
-
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.querybuilder.*;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ClusteringOrder;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import com.datastax.driver.core.querybuilder.Ordering;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
import org.jetbrains.annotations.NotNull;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.desc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.gt;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.lte;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.token;
import static org.apache.cassandra.diff.DiffContext.cqlizedString;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
public class DiffCluster implements AutoCloseable
{
@@ -43,12 +69,11 @@ public class DiffCluster implements AutoCloseable
public enum Type {SOURCE, TARGET}
- private final Map<String, PreparedStatement[]> preparedStatements = new HashMap<>();
+ private final Map<KeyspaceTablePair, PreparedStatement[]> preparedStatements = new HashMap<>();
private final ConsistencyLevel consistencyLevel;
public final Cluster cluster;
private final Session session;
private final TokenHelper tokenHelper;
- public final String keyspace;
public final List<BigInteger> tokenList;
public final RateLimiter getPartitionRateLimiter;
@@ -61,7 +86,6 @@ public class DiffCluster implements AutoCloseable
public DiffCluster(Type clusterId,
Cluster cluster,
- String keyspace,
ConsistencyLevel consistencyLevel,
RateLimiter getPartitionRateLimiter,
int tokenScanFetchSize,
@@ -69,7 +93,6 @@ public class DiffCluster implements AutoCloseable
int readTimeoutMillis)
{
- this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.cluster = cluster;
this.tokenHelper = TokenHelper.forPartitioner(cluster.getMetadata().getPartitioner());
@@ -82,7 +105,7 @@ public class DiffCluster implements AutoCloseable
this.readTimeoutMillis = readTimeoutMillis;
}
- public Iterator<PartitionKey> getPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+ public Iterator<PartitionKey> getPartitionKeys(KeyspaceTablePair table, final BigInteger prevToken, final BigInteger token) {
try {
return Uninterruptibles.getUninterruptibly(fetchPartitionKeys(table, prevToken, token));
}
@@ -93,7 +116,7 @@ public class DiffCluster implements AutoCloseable
}
}
- private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+ private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(KeyspaceTablePair table, final BigInteger prevToken, final BigInteger token) {
BoundStatement statement = keyReader(table).bind(tokenHelper.forBindParam(prevToken),
tokenHelper.forBindParam(token));
statement.setFetchSize(tokenScanFetchSize);
@@ -132,10 +155,10 @@ public class DiffCluster implements AutoCloseable
}
}
- private ResultSetFuture readPartition(String table, final PartitionKey key, boolean shouldReverse) {
+ private ResultSetFuture readPartition(KeyspaceTablePair keyspaceTablePair, final PartitionKey key, boolean shouldReverse) {
BoundStatement statement = shouldReverse
- ? reverseReader(table).bind(key.getComponents().toArray())
- : forwardReader(table).bind(key.getComponents().toArray());
+ ? reverseReader(keyspaceTablePair).bind(key.getComponents().toArray())
+ : forwardReader(keyspaceTablePair).bind(key.getComponents().toArray());
statement.setFetchSize(partitionReadFetchSize);
statement.setReadTimeoutMillis(readTimeoutMillis);
getPartitionRateLimiter.acquire();
@@ -153,38 +176,38 @@ public class DiffCluster implements AutoCloseable
cluster.closeAsync();
}
- private PreparedStatement keyReader(String table) {
- return getStatementForTable(table, 0);
+ private PreparedStatement keyReader(KeyspaceTablePair keyspaceTablePair) {
+ return getStatementForTable(keyspaceTablePair, 0);
}
- private PreparedStatement forwardReader(String table) {
- return getStatementForTable(table, 1);
+ private PreparedStatement forwardReader(KeyspaceTablePair keyspaceTablePair) {
+ return getStatementForTable(keyspaceTablePair, 1);
}
- private PreparedStatement reverseReader(String table) {
- return getStatementForTable(table, 2);
+ private PreparedStatement reverseReader(KeyspaceTablePair keyspaceTablePair) {
+ return getStatementForTable(keyspaceTablePair, 2);
}
- private PreparedStatement getStatementForTable(String table, int index) {
- if (!preparedStatements.containsKey(table)) {
+ private PreparedStatement getStatementForTable(KeyspaceTablePair keyspaceTablePair, int index) {
+ if (!preparedStatements.containsKey(keyspaceTablePair)) {
synchronized (this) {
- if (!preparedStatements.containsKey(table)) {
- PreparedStatement keyStatement = getKeyStatement(table);
- PreparedStatement[] partitionReadStmts = getFullStatement(table);
- preparedStatements.put(table, new PreparedStatement[]{ keyStatement ,
- partitionReadStmts[0],
- partitionReadStmts[1] });
+ if (!preparedStatements.containsKey(keyspaceTablePair)) {
+ PreparedStatement keyStatement = getKeyStatement(keyspaceTablePair);
+ PreparedStatement[] partitionReadStmts = getFullStatement(keyspaceTablePair);
+ preparedStatements.put(keyspaceTablePair, new PreparedStatement[]{ keyStatement ,
+ partitionReadStmts[0],
+ partitionReadStmts[1] });
}
}
}
- return preparedStatements.get(table)[index];
+ return preparedStatements.get(keyspaceTablePair)[index];
}
- private PreparedStatement getKeyStatement(@NotNull String table) {
+ private PreparedStatement getKeyStatement(@NotNull KeyspaceTablePair keyspaceTablePair) {
final TableMetadata tableMetadata = session.getCluster()
.getMetadata()
- .getKeyspace(cqlizedString(keyspace))
- .getTable(cqlizedString(table));
+ .getKeyspace(cqlizedString(keyspaceTablePair.keyspace))
+ .getTable(cqlizedString(keyspaceTablePair.table));
String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
Select.Selection selection = QueryBuilder.select().distinct().column(token(partitionKeyColumns));
@@ -199,11 +222,11 @@ public class DiffCluster implements AutoCloseable
return session.prepare(select).setConsistencyLevel(consistencyLevel);
}
- private PreparedStatement[] getFullStatement(@NotNull String table) {
+ private PreparedStatement[] getFullStatement(@NotNull KeyspaceTablePair keyspaceTablePair) {
final TableMetadata tableMetadata = session.getCluster()
.getMetadata()
- .getKeyspace(cqlizedString(keyspace))
- .getTable(cqlizedString(table));
+ .getKeyspace(cqlizedString(keyspaceTablePair.keyspace))
+ .getTable(cqlizedString(keyspaceTablePair.table));
String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
String[] allColumns = columnNames(tableMetadata.getColumns());
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index 3047c97..4a1faa1 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -21,14 +21,17 @@ package org.apache.cassandra.diff;
import java.io.Serializable;
import java.math.BigInteger;
-import java.util.*;
-import java.util.function.BiConsumer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,13 +70,13 @@ public class DiffJob {
// optional code block to run before a job starts
private Runnable preJobHook;
// optional code block to run after a job completes successfully; otherwise, it is not executed.
- private Consumer<Map<String, RangeStats>> postJobHook;
+ private Consumer<Map<KeyspaceTablePair, RangeStats>> postJobHook;
public void addPreJobHook(Runnable preJobHook) {
this.preJobHook = preJobHook;
}
- public void addPostJobHook(Consumer<Map<String, RangeStats>> postJobHook) {
+ public void addPostJobHook(Consumer<Map<KeyspaceTablePair, RangeStats>> postJobHook) {
this.postJobHook = postJobHook;
}
@@ -132,10 +135,9 @@ public class DiffJob {
targetProvider.getClusterName(),
targetProvider.toString());
- logger.info("DiffJob {} comparing [{}] in keyspace {} on {} and {}",
+ logger.info("DiffJob {} comparing [{}] on {} and {}",
jobId,
- String.join(",", params.tables),
- params.keyspace,
+ params.keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")),
sourceProvider,
targetProvider);
@@ -143,18 +145,18 @@ public class DiffJob {
preJobHook.run();
// Run the distributed diff and collate results
- Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
- .map((split) -> new Differ(configuration,
- params,
- perExecutorRateLimit,
- split,
- tokenHelper,
- sourceProvider,
- targetProvider,
- metadataProvider,
- new TrackerProvider(configuration.metadataOptions().keyspace))
- .run())
- .reduce(Differ::accumulate);
+ Map<KeyspaceTablePair, RangeStats> diffStats = sc.parallelize(splits, slices)
+ .map((split) -> new Differ(configuration,
+ params,
+ perExecutorRateLimit,
+ split,
+ tokenHelper,
+ sourceProvider,
+ targetProvider,
+ metadataProvider,
+ new TrackerProvider(configuration.metadataOptions().keyspace))
+ .run())
+ .reduce(Differ::accumulate);
// Publish results. This also removes the job from the currently running list
job.finalizeJob(params.jobId, diffStats);
logger.info("FINISHED: {}", diffStats);
@@ -181,8 +183,7 @@ public class DiffJob {
return job.getJobParams(conf.jobId().get());
} else {
return new Params(UUID.randomUUID(),
- conf.keyspace(),
- conf.tables(),
+ conf.keyspaceTables(),
conf.buckets(),
conf.splits());
}
@@ -264,22 +265,20 @@ public class DiffJob {
static class Params implements Serializable {
public final UUID jobId;
- public final String keyspace;
- public final ImmutableList<String> tables;
+ public final ImmutableList<KeyspaceTablePair> keyspaceTables;
public final int buckets;
public final int tasks;
- Params(UUID jobId, String keyspace, List<String> tables, int buckets, int tasks) {
+ Params(UUID jobId, List<KeyspaceTablePair> keyspaceTables, int buckets, int tasks) {
this.jobId = jobId;
- this.keyspace = keyspace;
- this.tables = ImmutableList.copyOf(tables);
+ this.keyspaceTables = ImmutableList.copyOf(keyspaceTables);
this.buckets = buckets;
this.tasks = tasks;
}
public String toString() {
- return String.format("Params: [jobId: %s, keyspace: %s, tables: %s, buckets: %s, tasks: %s]",
- jobId, keyspace, tables.stream().collect(Collectors.joining(",")), buckets, tasks);
+ return String.format("Params: [jobId: %s, keyspaceTables: %s, buckets: %s, tasks: %s]",
+ jobId, keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")), buckets, tasks);
}
}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index 2272b44..f7545e3 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -23,23 +23,25 @@ import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.math.BigInteger;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.function.BiConsumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.*;
-
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
public class Differ implements Serializable
{
@@ -53,8 +55,7 @@ public class Differ implements Serializable
private final UUID jobId;
private final DiffJob.Split split;
private final TokenHelper tokenHelper;
- private final String keyspace;
- private final List<String> tables;
+ private final List<KeyspaceTablePair> keyspaceTables;
private final RateLimiter rateLimiter;
private final DiffJob.TrackerProvider trackerProvider;
private final double reverseReadProbability;
@@ -92,8 +93,7 @@ public class Differ implements Serializable
this.jobId = params.jobId;
this.split = split;
this.tokenHelper = tokenHelper;
- this.keyspace = params.keyspace;
- this.tables = params.tables;
+ this.keyspaceTables = params.keyspaceTables;
this.trackerProvider = trackerProvider;
rateLimiter = RateLimiter.create(perExecutorRateLimit);
this.reverseReadProbability = config.reverseReadProbability();
@@ -110,7 +110,6 @@ public class Differ implements Serializable
{
srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
sourceProvider.getCluster(),
- params.keyspace,
cl,
rateLimiter,
config.tokenScanFetchSize(),
@@ -122,7 +121,6 @@ public class Differ implements Serializable
{
targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
targetProvider.getCluster(),
- params.keyspace,
cl,
rateLimiter,
config.tokenScanFetchSize(),
@@ -138,18 +136,19 @@ public class Differ implements Serializable
}
}
- public Map<String, RangeStats> run() {
+ public Map<KeyspaceTablePair, RangeStats> run() {
JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);
- Map<String, DiffJob.TaskStatus> tablesToDiff = filterTables(tables,
- split,
- journal::getLastStatus,
- !specificTokens.isEmpty());
- String metricsPrefix = String.format("%s.%s", srcDiffCluster.clusterId.name(), srcDiffCluster.keyspace);
+ Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToDiff = filterTables(keyspaceTables,
+ split,
+ journal::getLastStatus,
+ !specificTokens.isEmpty());
+
+ String metricsPrefix = srcDiffCluster.clusterId.name();
logger.info("Diffing {} for tables {}", split, tablesToDiff);
- for (Map.Entry<String, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
- final String table = tableStatus.getKey();
+ for (Map.Entry<KeyspaceTablePair, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
+ final KeyspaceTablePair keyspaceTablePair = tableStatus.getKey();
DiffJob.TaskStatus status = tableStatus.getValue();
RangeStats diffStats = status.stats;
@@ -160,37 +159,37 @@ public class Differ implements Serializable
BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
validateRange(startToken, split.end, tokenHelper);
- TableSpec sourceTable = TableSpec.make(table, srcDiffCluster);
- TableSpec targetTable = TableSpec.make(table, targetDiffCluster);
+ TableSpec sourceTable = TableSpec.make(keyspaceTablePair, srcDiffCluster);
+ TableSpec targetTable = TableSpec.make(keyspaceTablePair, targetDiffCluster);
validateTableSpecs(sourceTable, targetTable);
DiffContext ctx = new DiffContext(srcDiffCluster,
targetDiffCluster,
- keyspace,
+ keyspaceTablePair.keyspace,
sourceTable,
startToken,
split.end,
specificTokens,
reverseReadProbability);
- String timerName = String.format("%s.%s.split_times", metricsPrefix, table);
+ String timerName = String.format("%s.%s.split_times", metricsPrefix, keyspaceTablePair.table);
try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
diffStats.accumulate(diffTable(ctx,
- (error, token) -> journal.recordError(table, token, error),
- (type, token) -> journal.recordMismatch(table, type, token),
- (stats, token) -> journal.updateStatus(table, stats, token)));
+ (error, token) -> journal.recordError(keyspaceTablePair, token, error),
+ (type, token) -> journal.recordMismatch(keyspaceTablePair, type, token),
+ (stats, token) -> journal.updateStatus(keyspaceTablePair, stats, token)));
// update the journal with the final state for the table. Use the split's ending token
// as the last seen token (even though we may not have actually read any partition for
// that token) as this effectively marks the split as done.
- journal.finishTable(table, diffStats, !isRerun);
+ journal.finishTable(keyspaceTablePair, diffStats, !isRerun);
}
}
- Map<String, RangeStats> statsByTable = tablesToDiff.entrySet()
- .stream()
- .collect(Collectors.toMap(Map.Entry::getKey,
- e -> e.getValue().stats));
+ Map<KeyspaceTablePair, RangeStats> statsByTable = tablesToDiff.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> e.getValue().stats));
updateMetrics(metricsPrefix, statsByTable);
return statsByTable;
}
@@ -226,25 +225,25 @@ public class Differ implements Serializable
}
@VisibleForTesting
- static Map<String, DiffJob.TaskStatus> filterTables(Iterable<String> tables,
- DiffJob.Split split,
- Function<String, DiffJob.TaskStatus> journal,
- boolean includeCompleted) {
- Map<String, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
- for (String table : tables) {
- DiffJob.TaskStatus taskStatus = journal.apply(table);
+ static Map<KeyspaceTablePair, DiffJob.TaskStatus> filterTables(Iterable<KeyspaceTablePair> keyspaceTables,
+ DiffJob.Split split,
+ Function<KeyspaceTablePair, DiffJob.TaskStatus> journal,
+ boolean includeCompleted) {
+ Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
+ for (KeyspaceTablePair pair : keyspaceTables) {
+ DiffJob.TaskStatus taskStatus = journal.apply(pair);
RangeStats diffStats = taskStatus.stats;
BigInteger lastToken = taskStatus.lastToken;
// When we finish processing a split for a given table, we update the task status in journal
// to set the last seen token to the split's end token, to indicate that the split is complete.
if (!includeCompleted && lastToken != null && lastToken.equals(split.end)) {
- logger.info("Found finished table {} for split {}", table, split);
+ logger.info("Found finished table {} for split {}", pair, split);
}
else {
- tablesToProcess.put(table, diffStats != null
- ? taskStatus
- : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
+ tablesToProcess.put(pair, diffStats != null
+ ? taskStatus
+ : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
}
}
return tablesToProcess;
@@ -267,9 +266,9 @@ public class Differ implements Serializable
}
@VisibleForTesting
- static Map<String, RangeStats> accumulate(Map<String, RangeStats> stats, Map<String, RangeStats> otherStats)
+ static Map<KeyspaceTablePair, RangeStats> accumulate(Map<KeyspaceTablePair, RangeStats> stats, Map<KeyspaceTablePair, RangeStats> otherStats)
{
- for (Map.Entry<String, RangeStats> otherEntry : otherStats.entrySet())
+ for (Map.Entry<KeyspaceTablePair, RangeStats> otherEntry : otherStats.entrySet())
{
if (stats.containsKey(otherEntry.getKey()))
stats.get(otherEntry.getKey()).accumulate(otherEntry.getValue());
@@ -279,11 +278,12 @@ public class Differ implements Serializable
return stats;
}
- private static void updateMetrics(String prefix, Map<String, RangeStats> statsMap)
+ private static void updateMetrics(String prefix, Map<KeyspaceTablePair, RangeStats> statsMap)
{
- for (Map.Entry<String, RangeStats> entry : statsMap.entrySet())
+ for (Map.Entry<KeyspaceTablePair, RangeStats> entry : statsMap.entrySet())
{
- String qualifier = String.format("%s.%s", prefix, entry.getKey());
+ KeyspaceTablePair keyspaceTablePair = entry.getKey();
+ String qualifier = String.format("%s.%s.%s", prefix, keyspaceTablePair.keyspace, keyspaceTablePair.table);
RangeStats stats = entry.getValue();
metrics.meter(qualifier + ".partitions_read").mark(stats.getMatchedPartitions() + stats.getOnlyInSource() + stats.getOnlyInTarget() + stats.getMismatchedPartitions());
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index 1eb121c..d9d4035 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -20,13 +20,22 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
import com.datastax.driver.core.utils.UUIDs;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -40,7 +49,7 @@ public class JobMetadataDb {
private final int bucket;
private final String startToken;
private final String endToken;
- private final String keyspace;
+ private final String metadataKeyspace;
private Session session;
private static PreparedStatement updateStmt;
@@ -53,25 +62,25 @@ public class JobMetadataDb {
int bucket,
BigInteger startToken,
BigInteger endToken,
- String keyspace,
+ String metadataKeyspace,
Session session) {
this.jobId = jobId;
this.bucket = bucket;
this.startToken = startToken.toString();
this.endToken = endToken.toString();
- this.keyspace = keyspace;
+ this.metadataKeyspace = metadataKeyspace;
this.session = session;
}
/**
* Runs on each executor to prepare statements shared across all instances
*/
- public static void initializeStatements(Session session, String keyspace) {
+ public static void initializeStatements(Session session, String metadataKeyspace) {
if (updateStmt == null) {
updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" start_token," +
" end_token," +
" matched_partitions," +
@@ -84,47 +93,47 @@ public class JobMetadataDb {
" skipped_partitions," +
" last_token )" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- keyspace, Schema.TASK_STATUS));
+ metadataKeyspace, Schema.TASK_STATUS));
}
if (mismatchStmt == null) {
mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" mismatching_token," +
" mismatch_type )" +
"VALUES (?, ?, ?, ?, ?)",
- keyspace, Schema.MISMATCHES));
+ metadataKeyspace, Schema.MISMATCHES));
}
if (updateCompleteStmt == null) {
updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
" SET completed = completed + 1" +
" WHERE job_id = ? " +
" AND bucket = ? " +
- " AND table_name = ? ",
- keyspace, Schema.JOB_STATUS))
+ " AND qualified_table_name = ? ",
+ metadataKeyspace, Schema.JOB_STATUS))
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}
if (errorSummaryStmt == null) {
errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" start_token," +
" end_token)" +
" VALUES (?, ?, ?, ?, ?)",
- keyspace, Schema.ERROR_SUMMARY));
+ metadataKeyspace, Schema.ERROR_SUMMARY));
}
if (errorDetailStmt == null) {
errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" start_token," +
" end_token," +
" error_token)" +
" VALUES (?, ?, ?, ?, ?, ?)",
- keyspace, Schema.ERROR_DETAIL));
+ metadataKeyspace, Schema.ERROR_DETAIL));
}
}
@@ -140,10 +149,10 @@ public class JobMetadataDb {
/**
*
- * @param table
+ * @param keyspaceTablePair
* @return
*/
- public DiffJob.TaskStatus getLastStatus(String table) {
+ public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair keyspaceTablePair) {
ResultSet rs = session.execute(String.format("SELECT last_token, " +
" matched_partitions, " +
" mismatched_partitions, " +
@@ -156,11 +165,11 @@ public class JobMetadataDb {
" FROM %s.%s " +
" WHERE job_id = ? " +
" AND bucket = ? " +
- " AND table_name = ? " +
+ " AND qualified_table_name = ? " +
" AND start_token = ? " +
" AND end_token = ?",
- keyspace, Schema.TASK_STATUS),
- jobId, bucket, table, startToken, endToken);
+ metadataKeyspace, Schema.TASK_STATUS),
+ jobId, bucket, keyspaceTablePair.toCqlValueString(), startToken, endToken);
Row row = rs.one();
if (null == row)
return DiffJob.TaskStatus.EMPTY;
@@ -185,11 +194,11 @@ public class JobMetadataDb {
* @param diffStats
* @param latestToken
*/
- public void updateStatus(String table, RangeStats diffStats, BigInteger latestToken) {
+ public void updateStatus(KeyspaceTablePair table, RangeStats diffStats, BigInteger latestToken) {
session.execute(bindUpdateStatement(table, diffStats, latestToken));
}
- public void recordMismatch(String table, MismatchType type, BigInteger token) {
+ public void recordMismatch(KeyspaceTablePair table, MismatchType type, BigInteger token) {
logger.info("Detected mismatch in table {}; partition with token {} is {}",
table, token, type == MismatchType.PARTITION_MISMATCH
? " different in source and target clusters"
@@ -204,7 +213,7 @@ public class JobMetadataDb {
* @param token
* @param error
*/
- public void recordError(String table, BigInteger token, Throwable error) {
+ public void recordError(KeyspaceTablePair table, BigInteger token, Throwable error) {
logger.error(String.format("Encountered error during partition comparison in table %s; " +
"error for partition with token %s", table, token), error);
BatchStatement batch = new BatchStatement();
@@ -219,41 +228,41 @@ public class JobMetadataDb {
* @param table
* @param stats
*/
- public void finishTable(String table, RangeStats stats, boolean updateCompletedCount) {
+ public void finishTable(KeyspaceTablePair table, RangeStats stats, boolean updateCompletedCount) {
logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table);
// first flush out the last status.
session.execute(bindUpdateStatement(table, stats, endToken));
// then update the count of completed tasks
if (updateCompletedCount)
- session.execute(updateCompleteStmt.bind(jobId, bucket, table));
+ session.execute(updateCompleteStmt.bind(jobId, bucket, table.toCqlValueString()));
}
- private Statement bindMismatchesStatement(String table, BigInteger token, String type) {
- return mismatchStmt.bind(jobId, bucket, table, token.toString(), type)
+ private Statement bindMismatchesStatement(KeyspaceTablePair table, BigInteger token, String type) {
+ return mismatchStmt.bind(jobId, bucket, table.toCqlValueString(), token.toString(), type)
.setIdempotent(true);
}
- private Statement bindErrorSummaryStatement(String table) {
- return errorSummaryStmt.bind(jobId, bucket, table, startToken, endToken)
+ private Statement bindErrorSummaryStatement(KeyspaceTablePair table) {
+ return errorSummaryStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken)
.setIdempotent(true);
}
- private Statement bindErrorDetailStatement(String table, BigInteger errorToken) {
- return errorDetailStmt.bind(jobId, bucket, table, startToken, endToken, errorToken.toString())
+ private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken) {
+ return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString())
.setIdempotent(true);
}
- private Statement bindUpdateStatement(String table, RangeStats stats, BigInteger token) {
+ private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, BigInteger token) {
return bindUpdateStatement(table, stats, token.toString());
}
- private Statement bindUpdateStatement(String table, RangeStats stats, String token) {
+ private Statement bindUpdateStatement(KeyspaceTablePair table, RangeStats stats, String token) {
// We don't persist the partition error count from RangeStats as errors
// are likely to be transient and not data related, so we don't want to
// accumulate them across runs.
return updateStmt.bind(jobId,
bucket,
- table,
+ table.toCqlValueString(),
startToken,
endToken,
stats.getMatchedPartitions(),
@@ -275,29 +284,32 @@ public class JobMetadataDb {
static class JobLifeCycle {
final Session session;
- final String keyspace;
+ final String metadataKeyspace;
public JobLifeCycle(Session session, String metadataKeyspace) {
this.session = session;
- this.keyspace = metadataKeyspace;
+ this.metadataKeyspace = metadataKeyspace;
}
public DiffJob.Params getJobParams(UUID jobId) {
- ResultSet rs = session.execute(String.format("SELECT keyspace_name, " +
- " table_names," +
+ ResultSet rs = session.execute(String.format("SELECT qualified_table_names," +
" buckets," +
" total_tasks " +
"FROM %s.%s " +
"WHERE job_id = ?",
- keyspace, Schema.JOB_SUMMARY),
+ metadataKeyspace, Schema.JOB_SUMMARY),
jobId);
Row row = rs.one();
if (null == row)
return null;
+ // qualified_table_names is encoded as a List<String>. Decode it back to List<KeyspaceTablePair>.
+ List<KeyspaceTablePair> keyspaceTables = row.getList("qualified_table_names", String.class)
+ .stream()
+ .map(KeyspaceTablePair::new)
+ .collect(Collectors.toList());;
return new DiffJob.Params(jobId,
- row.getString("keyspace_name"),
- row.getList("table_names", String.class),
+ keyspaceTables,
row.getInt("buckets"),
row.getInt("total_tasks"));
}
@@ -314,7 +326,7 @@ public class JobMetadataDb {
// The job was previously run, so this could be a re-run to
// mop up any failed splits so mark it in progress.
ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
- keyspace, Schema.RUNNING_JOBS),
+ metadataKeyspace, Schema.RUNNING_JOBS),
params.jobId);
if (!rs.one().getBool("[applied]")) {
logger.info("Aborting due to inability to mark job as running. " +
@@ -330,21 +342,19 @@ public class JobMetadataDb {
" job_id," +
" job_start_time," +
" buckets," +
- " keyspace_name," +
- " table_names," +
+ " qualified_table_names," +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
" target_cluster_desc," +
" total_tasks)" +
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
" IF NOT EXISTS",
- keyspace, Schema.JOB_SUMMARY),
+ metadataKeyspace, Schema.JOB_SUMMARY),
params.jobId,
timeUUID,
params.buckets,
- params.keyspace,
- params.tables,
+ params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()),
sourceClusterName,
sourceClusterDesc,
targetClusterName,
@@ -355,34 +365,34 @@ public class JobMetadataDb {
if (rs.one().getBool("[applied]")) {
BatchStatement batch = new BatchStatement();
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
- keyspace, Schema.SOURCE_CLUSTER_INDEX),
+ metadataKeyspace, Schema.SOURCE_CLUSTER_INDEX),
sourceClusterName, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
- keyspace, Schema.TARGET_CLUSTER_INDEX),
+ metadataKeyspace, Schema.TARGET_CLUSTER_INDEX),
targetClusterName, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
- keyspace, Schema.KEYSPACE_INDEX),
- keyspace, params.jobId));
+ metadataKeyspace, Schema.KEYSPACE_INDEX),
+ metadataKeyspace, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
"VALUES ('%s', ?, ?, ?)",
- keyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
+ metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
startDateTime.getHourOfDay(), timeUUID, params.jobId));
session.execute(batch);
}
}
- public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {
+ public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats> results) {
logger.info("Finalizing job status");
markNotRunning(jobId);
BatchStatement batch = new BatchStatement();
- for (Map.Entry<String, RangeStats> result : results.entrySet()) {
- String table = result.getKey();
+ for (Map.Entry<KeyspaceTablePair, RangeStats> result : results.entrySet()) {
+ KeyspaceTablePair table = result.getKey();
RangeStats stats = result.getValue();
session.execute(String.format("INSERT INTO %s.%s (" +
" job_id," +
- " table_name," +
+ " qualified_table_name," +
" matched_partitions," +
" mismatched_partitions," +
" partitions_only_in_source," +
@@ -392,9 +402,9 @@ public class JobMetadataDb {
" mismatched_values," +
" skipped_partitions) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- keyspace, Schema.JOB_RESULTS),
+ metadataKeyspace, Schema.JOB_RESULTS),
jobId,
- table,
+ table.toCqlValueString(),
stats.getMatchedPartitions(),
stats.getMismatchedPartitions(),
stats.getOnlyInSource(),
@@ -414,18 +424,18 @@ public class JobMetadataDb {
logger.info("Marking job {} as not running", jobId);
ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
- keyspace, Schema.RUNNING_JOBS),
+ metadataKeyspace, Schema.RUNNING_JOBS),
jobId);
if (!rs.one().getBool("[applied]"))
{
logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
- "during initialization as there may be no entry for this job in the {} table",
+ "during initialization as there may be no entry for this job {} in the {} table",
jobId, Schema.RUNNING_JOBS);
}
} catch (Exception e) {
// Because this is called from another exception handler, we don't want to lose the original exception
// just because we may not have been able to mark the job as not running. Just log here
- logger.error("Could not mark job {} as not running.", e);
+ logger.error("Could not mark job {} as not running.", jobId, e);
}
}
}
@@ -436,7 +446,7 @@ public class JobMetadataDb {
private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ " qualified_table_name text," +
" start_token varchar," +
" end_token varchar," +
" matched_partitions bigint," +
@@ -448,7 +458,7 @@ public class JobMetadataDb {
" mismatched_values bigint," +
" skipped_partitions bigint," +
" last_token varchar," +
- " PRIMARY KEY((job_id, bucket), table_name, start_token, end_token))" +
+ " PRIMARY KEY((job_id, bucket), qualified_table_name, start_token, end_token))" +
" WITH default_time_to_live = %s";
public static final String JOB_SUMMARY = "job_summary";
@@ -456,8 +466,7 @@ public class JobMetadataDb {
" job_id uuid," +
" job_start_time timeuuid," +
" buckets int," +
- " keyspace_name text," +
- " table_names frozen<list<text>>," +
+ " qualified_table_names frozen<list<text>>," +
" source_cluster_name text," +
" source_cluster_desc text," +
" target_cluster_name text," +
@@ -469,7 +478,7 @@ public class JobMetadataDb {
public static final String JOB_RESULTS = "job_results";
private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
- " table_name text," +
+ " qualified_table_name text," +
" matched_partitions bigint," +
" mismatched_partitions bigint," +
" partitions_only_in_source bigint," +
@@ -478,46 +487,46 @@ public class JobMetadataDb {
" matched_values bigint," +
" mismatched_values bigint," +
" skipped_partitions bigint," +
- " PRIMARY KEY(job_id, table_name))" +
+ " PRIMARY KEY(job_id, qualified_table_name))" +
" WITH default_time_to_live = %s";
public static final String JOB_STATUS = "job_status";
private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ " qualified_table_name text," +
" completed counter," +
- " PRIMARY KEY ((job_id, bucket), table_name))";
+ " PRIMARY KEY ((job_id, bucket), qualified_table_name))";
public static final String MISMATCHES = "mismatches";
private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text, " +
+ " qualified_table_name text, " +
" mismatching_token varchar, " +
" mismatch_type text, " +
- " PRIMARY KEY ((job_id, bucket), table_name, mismatching_token))" +
+ " PRIMARY KEY ((job_id, bucket), qualified_table_name, mismatching_token))" +
" WITH default_time_to_live = %s";
public static final String ERROR_SUMMARY = "task_errors";
private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ " qualified_table_name text," +
" start_token varchar," +
" end_token varchar," +
- " PRIMARY KEY ((job_id, bucket), table_name, start_token, end_token))" +
+ " PRIMARY KEY ((job_id, bucket), qualified_table_name, start_token, end_token))" +
" WITH default_time_to_live = %s";
public static final String ERROR_DETAIL = "partition_errors";
private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ " qualified_table_name text," +
" start_token varchar," +
" end_token varchar," +
" error_token varchar," +
- " PRIMARY KEY ((job_id, bucket, table_name, start_token, end_token), error_token))" +
+ " PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" +
" WITH default_time_to_live = %s";
public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index";
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
index 6434dc8..4214f2b 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -20,12 +20,13 @@
package org.apache.cassandra.diff;
import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Row;
public class PartitionComparator implements Callable<PartitionStats> {
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
index d31da4f..5ed225a 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
@@ -28,7 +28,10 @@ import java.util.stream.StreamSupport;
import com.google.common.annotations.VisibleForTesting;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Token;
import org.jetbrains.annotations.NotNull;
public class PartitionKey implements Comparable<PartitionKey> {
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
index 36ce2b5..bb5e937 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -20,11 +20,13 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Iterator;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
import com.google.common.base.Verify;
import org.slf4j.Logger;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
index a0f8043..3aad1eb 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
@@ -19,7 +19,10 @@
package org.apache.cassandra.diff;
-import java.io.*;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
index d2f0963..38c131d 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
@@ -19,25 +19,30 @@
package org.apache.cassandra.diff;
-import com.datastax.driver.core.*;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+
import static org.apache.cassandra.diff.DiffContext.cqlizedString;
public class TableSpec {
- private final String table;
+ private final KeyspaceTablePair keyspaceTablePair;
private ImmutableList<ColumnMetadata> clusteringColumns;
private ImmutableList<ColumnMetadata> regularColumns;
- public String getTable()
+ public KeyspaceTablePair getTable()
{
- return table;
+ return keyspaceTablePair;
}
@@ -55,23 +60,23 @@ public class TableSpec {
* @param clusteringColumns the clustering columns, retrieved from cluster using the client
* @param regularColumns the non-primary key columns, retrieved from cluster using the client
*/
- TableSpec(final String table,
+ TableSpec(final KeyspaceTablePair table,
final List<ColumnMetadata> clusteringColumns,
final List<ColumnMetadata> regularColumns) {
- this.table = table;
+ this.keyspaceTablePair = table;
this.clusteringColumns = ImmutableList.copyOf(clusteringColumns);
this.regularColumns = ImmutableList.copyOf(regularColumns);
}
- public static TableSpec make(String table, DiffCluster diffCluster) {
+ public static TableSpec make(KeyspaceTablePair keyspaceTablePair, DiffCluster diffCluster) {
final Cluster cluster = diffCluster.cluster;
- final String cqlizedKeyspace = cqlizedString(diffCluster.keyspace);
- final String cqlizedTable = cqlizedString(table);
+ final String cqlizedKeyspace = cqlizedString(keyspaceTablePair.keyspace);
+ final String cqlizedTable = cqlizedString(keyspaceTablePair.table);
KeyspaceMetadata ksMetadata = cluster.getMetadata().getKeyspace(cqlizedKeyspace);
if (ksMetadata == null) {
- throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", diffCluster.keyspace, diffCluster.clusterId));
+ throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", keyspaceTablePair.keyspace, diffCluster.clusterId));
}
TableMetadata tableMetadata = ksMetadata.getTable(cqlizedTable);
@@ -80,11 +85,11 @@ public class TableSpec {
.stream()
.filter(c -> !(clusteringColumns.contains(c)))
.collect(Collectors.toList());
- return new TableSpec(tableMetadata.getName(), clusteringColumns, regularColumns);
+ return new TableSpec(KeyspaceTablePair.from(tableMetadata), clusteringColumns, regularColumns);
}
public boolean equalsNamesOnly(TableSpec other) {
- return this.table.equals(other.table)
+ return this.keyspaceTablePair.equals(other.keyspaceTablePair)
&& columnNames(this.clusteringColumns).equals(columnNames(other.clusteringColumns))
&& columnNames(this.regularColumns).equals(columnNames(other.regularColumns));
}
@@ -101,19 +106,19 @@ public class TableSpec {
return false;
TableSpec other = (TableSpec)o;
- return this.table.equals(other.table)
+ return this.keyspaceTablePair.equals(other.keyspaceTablePair)
&& this.clusteringColumns.equals(other.clusteringColumns)
&& this.regularColumns.equals(other.regularColumns);
}
public int hashCode() {
- return Objects.hash(table, clusteringColumns, regularColumns);
+ return Objects.hash(keyspaceTablePair, clusteringColumns, regularColumns);
}
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("table", table)
+ .add("table", keyspaceTablePair)
.add("clusteringColumns", clusteringColumns)
.add("regularColumns", regularColumns)
.toString();
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
index d8d92c7..9082970 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -24,9 +24,6 @@ import java.util.List;
import org.junit.Test;
-import org.apache.cassandra.diff.DiffJob;
-import org.apache.cassandra.diff.TokenHelper;
-
import static org.junit.Assert.assertEquals;
public class DiffJobTest
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
index 73677d9..e588575 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
@@ -24,14 +24,8 @@ import java.util.Map;
import java.util.function.Function;
import com.google.common.base.VerifyException;
-import org.junit.Test;
-
import com.google.common.collect.Lists;
-
-import org.apache.cassandra.diff.DiffJob;
-import org.apache.cassandra.diff.Differ;
-import org.apache.cassandra.diff.RangeStats;
-import org.apache.cassandra.diff.TokenHelper;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -74,9 +68,9 @@ public class DifferTest {
// * t2 is started and has reported some progress, but has not completed
// * t3 has not reported any progress
DiffJob.Split split = new DiffJob.Split(1, 1, BigInteger.ONE, BigInteger.TEN);
- Iterable<String> tables = Lists.newArrayList("t1", "t2", "t3");
- Function<String, DiffJob.TaskStatus> journal = (table) -> {
- switch (table) {
+ Iterable<KeyspaceTablePair> tables = Lists.newArrayList(ksTbl("t1"), ksTbl("t2"), ksTbl("t3"));
+ Function<KeyspaceTablePair, DiffJob.TaskStatus> journal = (keyspaceTable) -> {
+ switch (keyspaceTable.table) {
case "t1":
return new DiffJob.TaskStatus(split.end, RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6));
case "t2":
@@ -88,24 +82,27 @@ public class DifferTest {
}
};
- Map<String, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
+ Map<KeyspaceTablePair, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
assertEquals(2, filtered.keySet().size());
- assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
- assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
- assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
- assertNull(filtered.get("t3").lastToken);
+ assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get(ksTbl("t2")).stats);
+ assertEquals(BigInteger.valueOf(5L), filtered.get(ksTbl("t2")).lastToken);
+ assertEquals(RangeStats.newStats(), filtered.get(ksTbl("t3")).stats);
+ assertNull(filtered.get(ksTbl("t3")).lastToken);
// if re-running (part of) a job because of failures or problematic partitions, we want to
// ignore the status of completed tasks and re-run them anyway as only specified tokens will
// be processed - so t1 should be included now
filtered = Differ.filterTables(tables, split, journal, true);
assertEquals(3, filtered.keySet().size());
- assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get("t1").stats);
- assertEquals(split.end, filtered.get("t1").lastToken);
- assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
- assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
- assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
- assertNull(filtered.get("t3").lastToken);
+ assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get(ksTbl("t1")).stats);
+ assertEquals(split.end, filtered.get(ksTbl("t1")).lastToken);
+ assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get(ksTbl("t2")).stats);
+ assertEquals(BigInteger.valueOf(5L), filtered.get(ksTbl("t2")).lastToken);
+ assertEquals(RangeStats.newStats(), filtered.get(ksTbl("t3")).stats);
+ assertNull(filtered.get(ksTbl("t3")).lastToken);
}
+ private KeyspaceTablePair ksTbl(String table) {
+ return new KeyspaceTablePair("ks", table);
+ }
}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
index 79b3638..9cd1892 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
@@ -33,9 +33,6 @@ import com.google.common.reflect.TypeToken;
import org.junit.Test;
import com.datastax.driver.core.*;
-import org.apache.cassandra.diff.PartitionComparator;
-import org.apache.cassandra.diff.PartitionStats;
-import org.apache.cassandra.diff.TableSpec;
import static org.junit.Assert.assertEquals;
@@ -221,7 +218,7 @@ public class PartitionComparatorTest {
}
TableSpec spec(String table, List<String> clusteringColumns, List<String> regularColumns) {
- return new TableSpec(table, columns(clusteringColumns), columns(regularColumns));
+ return new TableSpec(new KeyspaceTablePair("ks", table), columns(clusteringColumns), columns(regularColumns));
}
List<ColumnMetadata> columns(List<String> names) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org