You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/10/13 01:42:05 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3765: [HUDI-2533] New option for hoodieClusteringJob to check, rollback and re-execute the last failed clustering job

nsivabalan commented on a change in pull request #3765:
URL: https://github.com/apache/hudi/pull/3765#discussion_r727633410



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1096,12 +1100,21 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws
                                                                      String clusteringInstantTime,
                                                                      boolean runSchedule,
                                                                      String runningMode) {
+    return buildHoodieClusteringUtilConfig(basePath, clusteringInstantTime, runSchedule, runningMode, false);

Review comment:
       is it possible to fetch the default value from where we have declared retryLastFailedClusteringJob config rather than hardcoding false here? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
##########
@@ -215,12 +221,26 @@ private int doCluster(JavaSparkContext jsc) throws Exception {
     return client.scheduleClustering(Option.empty());
   }
 
-  public int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
+  private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception {
     LOG.info("Step 1: Do schedule");
     String schemaStr = getSchemaFromLatestInstant();
     try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism, Option.empty(), props)) {
+      Option<String> instantTime;
+
+      if (cfg.retryLastFailedClusteringJob) {
+        HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
+        HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights();
+        if (inflightHoodieTimeline.empty()) {
+          instantTime = doSchedule(client);
+        } else {
+          // if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering.
+          LOG.info("Find failed clustering plan : " + inflightHoodieTimeline.lastInstant().get() + "; Will rollback and re-trigger this failed clustering plan.");

Review comment:
       minor. lets fix some phrases in log statement.
   ```
   "Found failed clustering instant at  : " + inflightHoodieTimeline.lastInstant().get() + "; Will rollback the failed clustering and re-trigger again.
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org