You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:48 UTC

[hudi] 07/17: [HUDI-4907] Prevent single commit multi instant issue (#6766)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 00d80215dfac458050d242844be1605cfe47ea15
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Sep 27 15:52:23 2022 +0800

    [HUDI-4907] Prevent single commit multi instant issue (#6766)
    
    
    Co-authored-by: TengHuo <te...@outlook.com>
    Co-authored-by: yuzhao.cyz <yu...@gmail.com>
---
 .../java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java   | 2 +-
 .../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java        | 6 +++---
 .../src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java    | 3 ++-
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e3b0d82704..c87d5b2443 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -341,7 +341,7 @@ public class StreamWriteOperatorCoordinator
 
   private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
     CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
-    ckpMetadata.bootstrap(metaClient);
+    ckpMetadata.bootstrap();
     return ckpMetadata;
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 4cdebf986f..6895b2a0c6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -94,7 +94,7 @@ public class CkpMetadata implements Serializable {
    *
    * <p>This expects to be called by the driver.
    */
-  public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
+  public void bootstrap() throws IOException {
     fs.delete(path, true);
     fs.mkdirs(path);
   }
@@ -173,8 +173,8 @@ public class CkpMetadata implements Serializable {
   @Nullable
   public String lastPendingInstant() {
     load();
-    for (int i = this.messages.size() - 1; i >= 0; i--) {
-      CkpMessage ckpMsg = this.messages.get(i);
+    if (this.messages.size() > 0) {
+      CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1);
       // consider 'aborted' as pending too to reuse the instant
       if (!ckpMsg.isComplete()) {
         return ckpMsg.getInstant();
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index a6fb493b9b..fe7ce3f947 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -63,7 +64,7 @@ public class TestCkpMetadata {
 
     assertThat(metadata.lastPendingInstant(), is("2"));
     metadata.commitInstant("2");
-    assertThat(metadata.lastPendingInstant(), is("1"));
+    assertThat(metadata.lastPendingInstant(), equalTo(null));
 
     // test cleaning
     IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));