You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/04/30 19:01:24 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #7826: [HUDI-5675] fix lazy clean schedule rollback on completed instant

nsivabalan commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1181278240


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java:
##########
@@ -912,6 +914,62 @@ public void testLazyRollbackOfFailedCommit(boolean rollbackUsingMarkers) throws
     assertEquals(numLogFilesAfterRollback, numLogFilesAfterCompaction);
   }
 
+  @Test
+  public void testGetInstantToRollbackForLazyCleanPolicy() throws Exception {
+    Properties properties = new Properties();
+    properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
+
+    HoodieWriteConfig cfg = getWriteConfig(true, false);
+    HoodieWriteConfig autoCommitFalseCfg = getWriteConfig(false, false);
+    autoCommitFalseCfg.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS.key(), "2000");
+    autoCommitFalseCfg.setValue(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.key(), "2");
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+    // Commit 1
+    insertRecords(client, dataGen, "001");
+
+    // Trigger an inflight commit 2
+    SparkRDDWriteClient autoCommitFalseClient = getHoodieWriteClient(autoCommitFalseCfg);
+    String commitTime2 = "002";
+    autoCommitFalseClient.startCommitWithTime(commitTime2);
+    List<HoodieRecord> records = dataGen.generateInserts(commitTime2, 20);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+    JavaRDD<WriteStatus> statuses = autoCommitFalseClient.upsert(writeRecords, commitTime2);
+
+    // Stop updating heartbeat
+    autoCommitFalseClient.getHeartbeatClient().getHeartbeat(commitTime2).getTimer().cancel();
+    // Sleep to make the heartbeat expired. In production env, heartbeat is expired because of commit
+    Thread.sleep(4000);

Review Comment:
   can we reduce this to 2 secs. bcoz, the heart beat expiry is set to 2 secs right. just trying to reduce the total test run time. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -707,20 +709,34 @@ protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, H
         }
       }).collect(Collectors.toList());
     } else if (cleaningPolicy.isLazy()) {
-      return inflightInstantsStream.filter(instant -> {
-        try {
-          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
-        } catch (IOException io) {
-          throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
-        }
-      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      return getInstantsToRollbackForLazyCleanPolicy(metaClient, inflightInstantsStream);
     } else if (cleaningPolicy.isNever()) {
       return Collections.emptyList();
     } else {
       throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
     }
   }
 
+  @VisibleForTesting

Review Comment:
   is it not possible to test it at the write client layer. would like to avoid using "VisibleForTesting" as much as possible. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -707,20 +709,34 @@ protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, H
         }
       }).collect(Collectors.toList());
     } else if (cleaningPolicy.isLazy()) {
-      return inflightInstantsStream.filter(instant -> {
-        try {
-          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
-        } catch (IOException io) {
-          throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
-        }
-      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      return getInstantsToRollbackForLazyCleanPolicy(metaClient, inflightInstantsStream);
     } else if (cleaningPolicy.isNever()) {
       return Collections.emptyList();
     } else {
       throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
     }
   }
 
+  @VisibleForTesting
+  public List<String> getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+                                                              Stream<HoodieInstant> inflightInstantsStream) {
+    // Get expired instants, must store them into list before double-checking
+    List<HoodieInstant> expiredInstants = inflightInstantsStream.filter(instant -> {
+      try {
+        // An instant transformed from inflight to completed have no heartbeat file and will be detected as expired instant here
+        return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+      } catch (IOException io) {
+        throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
+      }
+    }).collect(Collectors.toList());
+
+    // Only return instants that haven't been completed by other writers
+    return expiredInstants.stream()
+        .filter(instant -> !metaClient.getActiveTimeline().isCompletedCommitFileExists(instant))

Review Comment:
   may be, better option is to reload entire timeline and check if the commit is completed. 
   we don't want to make such adhoc calls. every call should go via proper timeline calls as much as possible. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org