You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:48 UTC
[hudi] 07/17: [HUDI-4907] Prevent single commit multi instant issue (#6766)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 00d80215dfac458050d242844be1605cfe47ea15
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Sep 27 15:52:23 2022 +0800
[HUDI-4907] Prevent single commit multi instant issue (#6766)
Co-authored-by: TengHuo <te...@outlook.com>
Co-authored-by: yuzhao.cyz <yu...@gmail.com>
---
.../java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java | 2 +-
.../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java | 6 +++---
.../src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java | 3 ++-
3 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e3b0d82704..c87d5b2443 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -341,7 +341,7 @@ public class StreamWriteOperatorCoordinator
private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
- ckpMetadata.bootstrap(metaClient);
+ ckpMetadata.bootstrap();
return ckpMetadata;
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 4cdebf986f..6895b2a0c6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -94,7 +94,7 @@ public class CkpMetadata implements Serializable {
*
* <p>This expects to be called by the driver.
*/
- public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
+ public void bootstrap() throws IOException {
fs.delete(path, true);
fs.mkdirs(path);
}
@@ -173,8 +173,8 @@ public class CkpMetadata implements Serializable {
@Nullable
public String lastPendingInstant() {
load();
- for (int i = this.messages.size() - 1; i >= 0; i--) {
- CkpMessage ckpMsg = this.messages.get(i);
+ if (this.messages.size() > 0) {
+ CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1);
// consider 'aborted' as pending too to reuse the instant
if (!ckpMsg.isComplete()) {
return ckpMsg.getInstant();
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index a6fb493b9b..fe7ce3f947 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.stream.IntStream;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -63,7 +64,7 @@ public class TestCkpMetadata {
assertThat(metadata.lastPendingInstant(), is("2"));
metadata.commitInstant("2");
- assertThat(metadata.lastPendingInstant(), is("1"));
+ assertThat(metadata.lastPendingInstant(), equalTo(null));
// test cleaning
IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));