You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/10/30 00:22:28 UTC
[hudi] branch master updated: [HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (#6796)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e222693d87 [HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (#6796)
e222693d87 is described below
commit e222693d87d48416670ca14c6f7fd69307432786
Author: 冯健 <fe...@gmail.com>
AuthorDate: Sun Oct 30 08:22:18 2022 +0800
[HUDI-4741] hotfix to avoid partial failover cause restored subtask timeout (#6796)
Co-authored-by: jian.feng <ji...@shopee.com>
---
.../hudi/sink/StreamWriteOperatorCoordinator.java | 36 ++++++++++++++
.../org/apache/hudi/sink/meta/CkpMetadata.java | 57 ++++++++++++++++------
.../sink/TestStreamWriteOperatorCoordinator.java | 9 ++++
3 files changed, 88 insertions(+), 14 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index c87d5b2443..a7b3994357 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -63,6 +63,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -152,6 +153,12 @@ public class StreamWriteOperatorCoordinator
*/
private CkpMetadata ckpMetadata;
+ /**
+ * Counter for the failed tasks, a number within the range (0, task_num) means
+ * a partial failover.
+ */
+ private transient AtomicInteger failedCnt;
+
/**
* Constructs a StreamingSinkOperatorCoordinator.
*
@@ -294,6 +301,17 @@ public class StreamWriteOperatorCoordinator
// reset the event
this.eventBuffer[i] = null;
LOG.warn("Reset the event for task [" + i + "]", throwable);
+
+ // based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling,
+ // when a sub-task event is received, we can decide whether it recovers from a partial or complete failover,
+ // then to reuse the current instant(PARTIAL) or start a new one(COMPLETE).
+
+ // reset the ckp metadata for either partial or complete failover
+ if (this.failedCnt.get() == 0) {
+ this.ckpMetadata.reset();
+ }
+ // inc the failed tasks counter
+ this.failedCnt.incrementAndGet();
}
@Override
@@ -347,6 +365,14 @@ public class StreamWriteOperatorCoordinator
private void reset() {
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
+ this.failedCnt = new AtomicInteger(0);
+ }
+
+ /**
+ * Checks whether it is a PARTIAL failover.
+ */
+ private boolean isPartialFailover() {
+ return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism;
}
/**
@@ -410,6 +436,16 @@ public class StreamWriteOperatorCoordinator
if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
// start to initialize the instant.
initInstant(event.getInstantTime());
+ } else if (isPartialFailover()) {
+ // if the bootstrap event comes from a partial failover,
+ // decrement the failed tasks by one.
+
+ // if all the failed task bootstrap events are received, send a start instant
+ // to the ckp metadata and unblock the data flushing.
+ if (this.failedCnt.decrementAndGet() <= 0) {
+ this.ckpMetadata.startInstant(this.instant);
+ this.failedCnt.set(0);
+ }
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 6895b2a0c6..d0f26740d6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -37,6 +37,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -61,7 +62,7 @@ public class CkpMetadata implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);
- protected static final int MAX_RETAIN_CKP_NUM = 3;
+ private static final int MAX_RETAIN_CKP_NUM = 3;
// the ckp metadata directory
private static final String CKP_META = "ckp_meta";
@@ -99,6 +100,19 @@ public class CkpMetadata implements Serializable {
fs.mkdirs(path);
}
+ /**
+ * Resets the message bus, would clean all the messages.
+ *
+ * <p>This expects to be called by the driver.
+ */
+ public void reset() {
+ Iterator<String> itr = this.instantCache.iterator();
+ while (itr.hasNext()) {
+ cleanInstant(itr.next(), true);
+ itr.remove();
+ }
+ }
+
public void startInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
try {
@@ -106,32 +120,47 @@ public class CkpMetadata implements Serializable {
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
}
+ // cache the instant
+ cache(instant);
// cleaning
- clean(instant);
+ clean();
}
- private void clean(String newInstant) {
+ private void cache(String newInstant) {
if (this.instantCache == null) {
this.instantCache = new ArrayList<>();
}
this.instantCache.add(newInstant);
+ }
+
+ private void clean() {
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
- final String instant = instantCache.get(0);
- boolean[] error = new boolean[1];
- CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
- try {
- fs.delete(path, false);
- } catch (IOException e) {
- error[0] = true;
- LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
- }
- });
- if (!error[0]) {
+ boolean success = cleanInstant(instantCache.get(0), false);
+ if (success) {
instantCache.remove(0);
}
}
}
+ private boolean cleanInstant(String instant, boolean throwsT) {
+ boolean success = true;
+ for (String fileName : CkpMessage.getAllFileNames(instant)) {
+ Path path = fullPath(fileName);
+ try {
+ fs.delete(path, false);
+ } catch (IOException ex) {
+ success = false;
+ final String errMsg = "Exception while cleaning the checkpoint meta file: " + path;
+ if (throwsT) {
+ throw new HoodieException(errMsg, ex);
+ } else {
+ LOG.warn(errMsg, ex);
+ }
+ }
+ }
+ return success;
+ }
+
/**
* Add a checkpoint commit message.
*
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index d5d35f7494..64bf8e2788 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
@@ -164,6 +165,14 @@ public class TestStreamWriteOperatorCoordinator {
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
}
+ @Test
+ public void testSubTaskFailed() {
+ coordinator.subtaskFailed(0, null);
+ assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned");
+ CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()));
+ assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned");
+ }
+
@Test
public void testHiveSyncInvoked() throws Exception {
// reset