You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/03/24 17:12:37 UTC
[cassandra-diff] branch master updated: Support registering preJob
and postJob hooks (#6)
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/master by this push:
new 8090d55 Support registering preJob and postJob hooks (#6)
8090d55 is described below
commit 8090d55a4e6740c57cb9d3d97b6370c53b9a71cd
Author: Yifan Cai <52...@users.noreply.github.com>
AuthorDate: Tue Mar 24 10:12:27 2020 -0700
Support registering preJob and postJob hooks (#6)
---
.../main/java/org/apache/cassandra/diff/DiffJob.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index 26a74e6..3047c97 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -22,6 +22,8 @@ package org.apache.cassandra.diff;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -62,6 +64,19 @@ public class DiffJob {
spark.stop();
}
+ // optional code block to run before a job starts
+ private Runnable preJobHook;
+ // optional code block to run after a job completes successfully; otherwise, it is not executed.
+ private Consumer<Map<String, RangeStats>> postJobHook;
+
+ public void addPreJobHook(Runnable preJobHook) {
+ this.preJobHook = preJobHook;
+ }
+
+ public void addPostJobHook(Consumer<Map<String, RangeStats>> postJobHook) {
+ this.postJobHook = postJobHook;
+ }
+
public void run(JobConfiguration configuration, JavaSparkContext sc) {
SparkConf conf = sc.getConf();
// get partitioner from both clusters and verify that they match
@@ -124,6 +139,9 @@ public class DiffJob {
sourceProvider,
targetProvider);
+ if (null != preJobHook)
+ preJobHook.run();
+
// Run the distributed diff and collate results
Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
.map((split) -> new Differ(configuration,
@@ -140,6 +158,8 @@ public class DiffJob {
// Publish results. This also removes the job from the currently running list
job.finalizeJob(params.jobId, diffStats);
logger.info("FINISHED: {}", diffStats);
+ if (null != postJobHook)
+ postJobHook.accept(diffStats);
} catch (Exception e) {
// If the job errors out, try and mark the job as not running, so it can be restarted.
// If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org