You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "stream2000 (via GitHub)" <gi...@apache.org> on 2023/02/07 07:19:48 UTC

[GitHub] [hudi] stream2000 commented on a diff in pull request #7826: [HUDI-5675] fix lazy clean schedule rollback on completed instant

stream2000 commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1098259478


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -707,20 +709,33 @@ protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, H
         }
       }).collect(Collectors.toList());
     } else if (cleaningPolicy.isLazy()) {
-      return inflightInstantsStream.filter(instant -> {
-        try {
-          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
-        } catch (IOException io) {
-          throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
-        }
-      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      return getInstantsToRollbackForLazyCleanPolicy(metaClient, inflightInstantsStream);
     } else if (cleaningPolicy.isNever()) {
       return Collections.emptyList();
     } else {
       throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
     }
   }
 
+  @VisibleForTesting
+  public List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+                                                              Stream<HoodieInstant> inflightInstantsStream) {
+    // Get expired instants, must store them into list before double-checking
+    List<String> expiredInstants = inflightInstantsStream.filter(instant -> {
+      try {
+        // An instant transformed from inflight to completed have no heartbeat file and will be detected as expired instant here
+        return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+      } catch (IOException io) {
+        throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
+      }
+    }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+    // Double check whether the heartbeat-expired instant is an inflight instant
+    metaClient.reloadActiveTimeline();

Review Comment:
   Only heartbeat-expired instants will be rollback. If the instant is finished after `metaClient.reloadActiveTimeline`,  the heartbeat is not expired when checking heartbeat, so it won't be rollback. That why we must check heartbeat first and then reload the timeline to do the double check 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org