You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/03/24 17:12:37 UTC

[cassandra-diff] branch master updated: Support registering preJob and postJob hooks (#6)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git


The following commit(s) were added to refs/heads/master by this push:
     new 8090d55  Support registering preJob and postJob hooks (#6)
8090d55 is described below

commit 8090d55a4e6740c57cb9d3d97b6370c53b9a71cd
Author: Yifan Cai <52...@users.noreply.github.com>
AuthorDate: Tue Mar 24 10:12:27 2020 -0700

    Support registering preJob and postJob hooks (#6)
---
 .../main/java/org/apache/cassandra/diff/DiffJob.java | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index 26a74e6..3047c97 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -22,6 +22,8 @@ package org.apache.cassandra.diff;
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -62,6 +64,19 @@ public class DiffJob {
         spark.stop();
     }
 
+    // optional code block to run before a job starts
+    private Runnable preJobHook;
+    // optional code block to run after a job completes successfully; otherwise, it is not executed.
+    private Consumer<Map<String, RangeStats>> postJobHook;
+
+    public void addPreJobHook(Runnable preJobHook) {
+        this.preJobHook = preJobHook;
+    }
+
+    public void addPostJobHook(Consumer<Map<String, RangeStats>> postJobHook) {
+        this.postJobHook = postJobHook;
+    }
+
     public void run(JobConfiguration configuration, JavaSparkContext sc) {
         SparkConf conf = sc.getConf();
         // get partitioner from both clusters and verify that they match
@@ -124,6 +139,9 @@ public class DiffJob {
                         sourceProvider,
                         targetProvider);
 
+            if (null != preJobHook)
+                preJobHook.run();
+
             // Run the distributed diff and collate results
             Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
                                                   .map((split) -> new Differ(configuration,
@@ -140,6 +158,8 @@ public class DiffJob {
             // Publish results. This also removes the job from the currently running list
             job.finalizeJob(params.jobId, diffStats);
             logger.info("FINISHED: {}", diffStats);
+            if (null != postJobHook)
+                postJobHook.accept(diffStats);
         } catch (Exception e) {
             // If the job errors out, try and mark the job as not running, so it can be restarted.
             // If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org