You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/05 01:32:54 UTC
[hudi] branch master updated: [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 257e1680c1e [HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)
257e1680c1e is described below
commit 257e1680c1e3a3b2f6b4846ec7d09a034b1f064a
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Wed Apr 5 09:32:46 2023 +0800
[HUDI-6030] Cleans the ckp meta while the JM restarts (#8374)
We received several bug reports since #7620, for example: #8060, this patch revert the changes of CkpMetadata and always report the write metadata events for write task, the coordinator would decide whether to re-commit these metadata stats.
---
.../sink/common/AbstractStreamWriteFunction.java | 7 +++++--
.../org/apache/hudi/sink/meta/CkpMetadata.java | 13 +++++++++----
.../org/apache/hudi/sink/meta/TestCkpMetadata.java | 22 ++++++++++++++++++++++
3 files changed, 36 insertions(+), 6 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 23b580b86bc..cbb8851a940 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -21,6 +21,7 @@ package org.apache.hudi.sink.common;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -46,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
/**
* Base infrastructures for streaming writer function.
@@ -194,8 +194,11 @@ public abstract class AbstractStreamWriteFunction<I>
private void restoreWriteMetadata() throws Exception {
boolean eventSent = false;
+ HoodieTimeline pendingTimeline = this.metaClient.getActiveTimeline().filterPendingExcludingCompaction();
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
- if (Objects.equals(this.currentInstant, event.getInstantTime())) {
+ // Must filter out the completed instants in case it is a partial failover,
+ // the write status should not be accumulated in such case.
+ if (pendingTimeline.containsInstant(event.getInstantTime())) {
// Reset taskID for event
event.setTaskID(taskID);
// The checkpoint succeed but the meta does not commit,
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index a9a099f8494..9b0457845e9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
@@ -94,14 +95,13 @@ public class CkpMetadata implements Serializable, AutoCloseable {
// -------------------------------------------------------------------------
/**
- * Initialize the message bus, would keep all the messages.
+ * Initialize the message bus, would clean all the messages
*
* <p>This expects to be called by the driver.
*/
public void bootstrap() throws IOException {
- if (!fs.exists(path)) {
- fs.mkdirs(path);
- }
+ fs.delete(path, true);
+ fs.mkdirs(path);
}
public void startInstant(String instant) {
@@ -203,6 +203,11 @@ public class CkpMetadata implements Serializable, AutoCloseable {
return this.messages.stream().anyMatch(ckpMsg -> instant.equals(ckpMsg.getInstant()) && ckpMsg.isAborted());
}
+ @VisibleForTesting
+ public List<String> getInstantCache() {
+ return this.instantCache;
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index b49441f3b9d..1ef2254ff8e 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -26,16 +26,20 @@ import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
/**
* Test cases for {@link CkpMetadata}.
@@ -72,6 +76,24 @@ public class TestCkpMetadata {
assertThat(metadata.getMessages().size(), is(5));
}
+ @Test
+ void testBootstrap() throws Exception {
+ CkpMetadata metadata = getCkpMetadata("");
+ // write 4 instants to the ckp_meta
+ IntStream.range(0, 4).forEach(i -> metadata.startInstant(i + ""));
+ assertThat("The first instant should be removed from the instant cache",
+ metadata.getInstantCache(), is(Arrays.asList("1", "2", "3")));
+
+ // simulate the reboot of coordinator
+ CkpMetadata metadata1 = getCkpMetadata("");
+ metadata1.bootstrap();
+ assertNull(metadata1.getInstantCache(), "The instant cache should be recovered from bootstrap");
+
+ metadata1.startInstant("4");
+ assertThat("The first instant should be removed from the instant cache",
+ metadata1.getInstantCache(), is(Collections.singletonList("4")));
+ }
+
private CkpMetadata getCkpMetadata(String uniqueId) {
String basePath = tempFile.getAbsolutePath();
FileSystem fs = FSUtils.getFs(basePath, HadoopConfigurations.getHadoopConf(new Configuration()));