You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/05 01:32:54 UTC

[hudi] branch master updated: [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 257e1680c1e [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)
257e1680c1e is described below

commit 257e1680c1e3a3b2f6b4846ec7d09a034b1f064a
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Apr 5 09:32:46 2023 +0800

    [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)
    
    We received several bug reports since #7620, for example: #8060, this patch revert the changes of CkpMetadata and always report the write metadata events for write task, the coordinator would decide whether to re-commit these metadata stats.
---
 .../sink/common/AbstractStreamWriteFunction.java   |  7 +++++--
 .../org/apache/hudi/sink/meta/CkpMetadata.java     | 13 +++++++++----
 .../org/apache/hudi/sink/meta/TestCkpMetadata.java | 22 ++++++++++++++++++++++
 3 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 23b580b86bc..cbb8851a940 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.common;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 
 /**
  * Base infrastructures for streaming writer function.
@@ -194,8 +194,11 @@ public abstract class AbstractStreamWriteFunction<I>
 
   private void restoreWriteMetadata() throws Exception {
     boolean eventSent = false;
+    HoodieTimeline pendingTimeline = this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
     for (WriteMetadataEvent event : this.writeMetadataState.get()) {
-      if (Objects.equals(this.currentInstant, event.getInstantTime())) {
+      // Must filter out the completed instants in case it is a partial failover,
+      // the write status should not be accumulated in such case.
+      if (pendingTimeline.containsInstant(event.getInstantTime())) {
         // Reset taskID for event
         event.setTaskID(taskID);
         // The checkpoint succeed but the meta does not commit,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index a9a099f8494..9b0457845e9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieException;
@@ -94,14 +95,13 @@ public class CkpMetadata implements Serializable, AutoCloseable {
   // -------------------------------------------------------------------------
 
   /**
-   * Initialize the message bus, would keep all the messages.
+   * Initialize the message bus, would clean all the messages
    *
    * <p>This expects to be called by the driver.
    */
   public void bootstrap() throws IOException {
-    if (!fs.exists(path)) {
-      fs.mkdirs(path);
-    }
+    fs.delete(path, true);
+    fs.mkdirs(path);
   }
 
   public void startInstant(String instant) {
@@ -203,6 +203,11 @@ public class CkpMetadata implements Serializable, AutoCloseable {
     return this.messages.stream().anyMatch(ckpMsg -> instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted());
   }
 
+  @VisibleForTesting
+  public List<String> getInstantCache() {
+    return this.instantCache;
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index b49441f3b9d..1ef2254ff8e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -26,16 +26,20 @@ import org.apache.hudi.utils.TestConfigurations;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /**
  * Test cases for {@link CkpMetadata}.
@@ -72,6 +76,24 @@ public class TestCkpMetadata {
     assertThat(metadata.getMessages().size(), is(5));
   }
 
+  @Test
+  void testBootstrap() throws Exception {
+    CkpMetadata metadata = getCkpMetadata("");
+    // write 4 instants to the ckp_meta
+    IntStream.range(0, 4).forEach(i -> metadata.startInstant(i + ""));
+    assertThat("The first instant should be removed from the instant cache",
+        metadata.getInstantCache(), is(Arrays.asList("1", "2", "3")));
+
+    // simulate the reboot of coordinator
+    CkpMetadata metadata1 = getCkpMetadata("");
+    metadata1.bootstrap();
+    assertNull(metadata1.getInstantCache(), "The instant cache should be recovered from bootstrap");
+
+    metadata1.startInstant("4");
+    assertThat("The first instant should be removed from the instant cache",
+        metadata1.getInstantCache(), is(Collections.singletonList("4")));
+  }
+
   private CkpMetadata getCkpMetadata(String uniqueId) {
     String basePath = tempFile.getAbsolutePath();
     FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(new Configuration()));