You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yc...@apache.org on 2021/09/22 17:42:10 UTC
[cassandra-diff] branch trunk updated: Cassandra diff should accept
a provided job_id for retrying diffs
This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/trunk by this push:
new 75b70c3 Cassandra diff should accept a provided job_id for retrying diffs
75b70c3 is described below
commit 75b70c35eea537d038c9f72632eec3d141b05379
Author: Jyothsna Konisa <jk...@apple.com>
AuthorDate: Wed Jul 14 15:51:54 2021 -0700
Cassandra diff should accept a provided job_id for retrying diffs
Patch by Jyothsna Konisa; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRA-16968
---
spark-job/pom.xml | 7 +++
.../java/org/apache/cassandra/diff/DiffJob.java | 35 ++++++++++----
.../org/apache/cassandra/diff/JobMetadataDb.java | 2 +-
.../org/apache/cassandra/diff/DiffJobTest.java | 55 ++++++++++++++++++++++
4 files changed, 89 insertions(+), 10 deletions(-)
diff --git a/spark-job/pom.xml b/spark-job/pom.xml
index c7df5bd..cd1603b 100644
--- a/spark-job/pom.xml
+++ b/spark-job/pom.xml
@@ -67,5 +67,12 @@
<artifactId>junit</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>3.5.10</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index d744bff..2632c09 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -122,9 +122,12 @@ public class DiffJob {
ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
JobMetadataDb.JobLifeCycle job = null;
UUID jobId = null;
- try (Cluster metadataCluster = metadataProvider.getCluster();
- Session metadataSession = metadataCluster.connect()) {
+ Cluster metadataCluster = null;
+ Session metadataSession = null;
+ try {
+ metadataCluster = metadataProvider.getCluster();
+ metadataSession = metadataCluster.connect();
RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions());
MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider);
@@ -197,18 +200,32 @@ public class DiffJob {
Differ.shutdown();
JobMetadataDb.ProgressTracker.resetStatements();
}
+ if (metadataCluster != null) {
+ metadataCluster.close();
+ }
+ if (metadataSession != null) {
+ metadataSession.close();
+ }
+
}
}
- private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
+ @VisibleForTesting
+ static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
if (conf.jobId().isPresent()) {
- return job.getJobParams(conf.jobId().get());
- } else {
- return new Params(UUID.randomUUID(),
- keyspaceTables,
- conf.buckets(),
- conf.splits());
+ final Params jobParams = job.getJobParams(conf.jobId().get());
+ if(jobParams != null) {
+ // When job_id is passed as a config property for the first time, we will not have metadata associated
+ // with job_id in metadata table. we should return jobParams from the table only when jobParams is not null
+ // Otherwise return new jobParams with provided job_id
+ return jobParams;
+ }
}
+ final UUID jobId = conf.jobId().isPresent() ? conf.jobId().get() : UUID.randomUUID();
+ return new Params(jobId,
+ keyspaceTables,
+ conf.buckets(),
+ conf.splits());
}
private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) {
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index bef173a..71a802f 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -369,7 +369,7 @@ public class JobMetadataDb {
metadataKeyspace, Schema.RUNNING_JOBS),
params.jobId);
if (!rs.one().getBool("[applied]")) {
- logger.info("Aborting due to inability to mark job as running. " +
+ logger.info("Could not mark job as running. " +
"Did a previous run of job id {} fail non-gracefully?",
params.jobId);
throw new RuntimeException("Unable to mark job running, aborting");
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
index 9082970..1bf656d 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -20,11 +20,18 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DiffJobTest
{
@@ -39,6 +46,37 @@ public class DiffJobTest
splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner"));
}
+ @Test
+ public void testGetJobParamsWithJobIdProvidedShouldReturnNonNullConFigParams() {
+ final MockConfig mockConfig = new MockConfig();
+ final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
+ final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
+ final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
+ assertNotNull(params);
+ }
+
+ @Test
+ public void testGetJobParamsDuringRetryShouldReturnPreviousParams() {
+ final MockConfig mockConfig = new MockConfig();
+ final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
+ final DiffJob.Params mockParams = mock(DiffJob.Params.class);
+ when(mockJob.getJobParams(any())).thenAnswer(invocationOnMock -> mockParams);
+ final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
+ final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
+ assertEquals(params, mockParams);
+ }
+
+ @Test
+ public void testGetJobParamsWithNoJobId() {
+ final MockConfig mockConfig = mock(MockConfig.class);
+ when(mockConfig.jobId()).thenReturn(Optional.empty());
+
+ final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
+ final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
+ final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
+ assertNotNull(params.jobId);
+ }
+
private void splitTestHelper(TokenHelper tokens)
{
List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens);
@@ -54,4 +92,21 @@ public class DiffJobTest
for (int i = 0; i < splits.size(); i++)
assertEquals(i, splits.get(i).splitNumber);
}
+
+ private class MockConfig extends AbstractMockJobConfiguration {
+ @Override
+ public int splits() {
+ return 2;
+ }
+
+ @Override
+ public int buckets() {
+ return 2;
+ }
+
+ @Override
+ public Optional<UUID> jobId() {
+ return Optional.of(UUID.randomUUID());
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org