You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/11/04 12:45:50 UTC

[hudi] branch revert-3915-HUDI-2677 created (now ea1a6bf)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a change to branch revert-3915-HUDI-2677
in repository https://gitbox.apache.org/repos/asf/hudi.git.


      at ea1a6bf  Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)"

This branch includes the following new commits:

     new ea1a6bf  Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[hudi] 01/01: Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)"

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch revert-3915-HUDI-2677
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ea1a6bfcc4258692157634c35d6810bb91632af8
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Nov 4 20:45:36 2021 +0800

    Revert "[HUDI-2677] Add DFS based message queue for flink writer (#3915)"
    
    This reverts commit dbf8c44bdb3019f2ce93d6b1224d9d478c0340fa.
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  |   8 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  65 +++++---
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |  40 ++---
 .../sink/common/AbstractStreamWriteFunction.java   |  98 ++----------
 .../org/apache/hudi/sink/message/MessageBus.java   | 173 ---------------------
 .../apache/hudi/sink/message/MessageClient.java    | 126 ---------------
 .../apache/hudi/sink/message/MessageDriver.java    | 132 ----------------
 .../apache/hudi/sink/message/TestMessageBus.java   | 137 ----------------
 .../sink/utils/StreamWriteFunctionWrapper.java     |   1 -
 9 files changed, 79 insertions(+), 701 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 11564d1..0e7e35e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -137,7 +137,6 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
 
   @Override
   public void close() {
-    super.close();
     if (this.writeClient != null) {
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
@@ -402,6 +401,11 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     }
   }
 
+  private boolean hasData() {
+    return this.buckets.size() > 0
+        && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
+  }
+
   @SuppressWarnings("unchecked, rawtypes")
   private boolean flushBucket(DataBucket bucket) {
     String instant = instantToWrite(true);
@@ -435,7 +439,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
 
   @SuppressWarnings("unchecked, rawtypes")
   private void flushRemaining(boolean endInput) {
-    this.currentInstant = instantToWrite(false);
+    this.currentInstant = instantToWrite(hasData());
     if (this.currentInstant == null) {
       // in case there are empty checkpoints that has no input data
       throw new HoodieException("No inflight instant when flushing data!");
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index a30d766..feb348f 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -30,9 +30,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageDriver;
 import org.apache.hudi.sink.utils.HiveSyncContext;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.util.StreamerUtil;
@@ -42,6 +41,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,11 +137,6 @@ public class StreamWriteOperatorCoordinator
   private transient TableState tableState;
 
   /**
-   * The message driver.
-   */
-  private MessageDriver messageDriver;
-
-  /**
    * Constructs a StreamingSinkOperatorCoordinator.
    *
    * @param conf    The config options
@@ -179,7 +174,6 @@ public class StreamWriteOperatorCoordinator
     if (tableState.syncMetadata) {
       initMetadataSync();
     }
-    this.messageDriver = MessageBus.getDriver(this.metaClient.getFs(), metaClient.getBasePath());
   }
 
   @Override
@@ -197,9 +191,6 @@ public class StreamWriteOperatorCoordinator
       writeClient.close();
     }
     this.eventBuffer = null;
-    if (this.messageDriver != null) {
-      this.messageDriver.close();
-    }
   }
 
   @Override
@@ -236,7 +227,7 @@ public class StreamWriteOperatorCoordinator
               writeClient.scheduleCompaction(Option.empty());
             }
             // start new instant.
-            startInstant(checkpointId);
+            startInstant();
             // sync Hive if is enabled
             syncHiveIfEnabled();
           }
@@ -246,7 +237,12 @@ public class StreamWriteOperatorCoordinator
 
   @Override
   public void notifyCheckpointAborted(long checkpointId) {
-    this.messageDriver.abortCkp(checkpointId);
+    // once the checkpoint was aborted, unblock the writer tasks to
+    // reuse the last instant.
+    if (!WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) {
+      executor.execute(() -> sendCommitAckEvents(checkpointId),
+          "unblock data write with aborted checkpoint %s", checkpointId);
+    }
   }
 
   @Override
@@ -337,19 +333,12 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void startInstant() {
-    // the flink checkpoint id starts from 1,
-    // see AbstractStreamWriteFunction#ackInstant
-    startInstant(MessageBus.INITIAL_CKP_ID);
-  }
-
-  private void startInstant(long checkpoint) {
     final String instant = HoodieActiveTimeline.createNewInstantTime();
     this.writeClient.startCommitWithTime(instant, tableState.commitAction);
-    this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, instant);
-    this.writeClient.upgradeDowngrade(instant);
-    this.messageDriver.commitCkp(checkpoint, this.instant, instant);
     this.instant = instant;
-    LOG.info("Create instant [{}] for table [{}] with type [{}]", instant,
+    this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
+    this.writeClient.upgradeDowngrade(this.instant);
+    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
         this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
   }
 
@@ -409,6 +398,33 @@ public class StreamWriteOperatorCoordinator
   }
 
   /**
+   * The coordinator reuses the instant if there is no data for this round of checkpoint,
+   * sends the commit ack events to unblock the flushing.
+   */
+  private void sendCommitAckEvents(long checkpointId) {
+    CompletableFuture<?>[] futures = Arrays.stream(this.gateways).filter(Objects::nonNull)
+        .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
+        .toArray(CompletableFuture<?>[]::new);
+    try {
+      CompletableFuture.allOf(futures).get();
+    } catch (Throwable throwable) {
+      if (!sendToFinishedTasks(throwable)) {
+        throw new HoodieException("Error while waiting for the commit ack events to finish sending", throwable);
+      }
+    }
+  }
+
+  /**
+   * Decides whether the given exception is caused by sending events to FINISHED tasks.
+   *
+   * <p>Ugly impl: the exception may change in the future.
+   */
+  private static boolean sendToFinishedTasks(Throwable throwable) {
+    return throwable.getCause() instanceof TaskNotRunningException
+        || throwable.getCause().getMessage().contains("running");
+  }
+
+  /**
    * Commits the instant.
    */
   private void commitInstant(String instant) {
@@ -435,7 +451,8 @@ public class StreamWriteOperatorCoordinator
     if (writeResults.size() == 0) {
       // No data has written, reset the buffer and returns early
       reset();
-      messageDriver.commitCkp(checkpointId, this.instant, this.instant);
+      // Send commit ack event to the write function to unblock the flushing
+      sendCommitAckEvents(checkpointId);
       return false;
     }
     doCommit(instant, writeResults);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index f5fda5a..f3cfbae 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -21,13 +21,11 @@ package org.apache.hudi.sink.bulk;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageClient;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -40,8 +38,6 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -84,19 +80,24 @@ public class BulkInsertWriteFunction<I>
   private int taskID;
 
   /**
+   * Meta Client.
+   */
+  private transient HoodieTableMetaClient metaClient;
+
+  /**
    * Write Client.
    */
   private transient HoodieFlinkWriteClient writeClient;
 
   /**
-   * Gateway to send operator events to the operator coordinator.
+   * The initial inflight instant when start up.
    */
-  private transient OperatorEventGateway eventGateway;
+  private volatile String initInstant;
 
   /**
-   * The message client.
+   * Gateway to send operator events to the operator coordinator.
    */
-  private MessageClient messageClient;
+  private transient OperatorEventGateway eventGateway;
 
   /**
    * Constructs a StreamingSinkFunction.
@@ -111,8 +112,9 @@ public class BulkInsertWriteFunction<I>
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.metaClient = StreamerUtil.createMetaClient(this.config);
     this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
-    this.messageClient = MessageBus.getClient(config.getString(FlinkOptions.PATH));
+    this.initInstant = StreamerUtil.getLastPendingInstant(this.metaClient, false);
     sendBootstrapEvent();
     initWriterHelper();
   }
@@ -128,9 +130,6 @@ public class BulkInsertWriteFunction<I>
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
     }
-    if (this.messageClient != null) {
-      this.messageClient.close();
-    }
   }
 
   /**
@@ -184,17 +183,8 @@ public class BulkInsertWriteFunction<I>
     LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID);
   }
 
-  /**
-   * Returns the next instant to write from the message bus.
-   */
-  @Nullable
-  private String ackInstant() {
-    Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(MessageBus.INITIAL_CKP_ID);
-    return ckpMessageOption.map(message -> message.inflightInstant).orElse(null);
-  }
-
   private String instantToWrite() {
-    String instant = ackInstant();
+    String instant = StreamerUtil.getLastPendingInstant(this.metaClient);
     // if exactly-once semantics turns on,
     // waits for the checkpoint notification until the checkpoint timeout threshold hits.
     TimeWait timeWait = TimeWait.builder()
@@ -202,14 +192,14 @@ public class BulkInsertWriteFunction<I>
         .action("instant initialize")
         .throwsT(true)
         .build();
-    while (instant == null) {
+    while (instant == null || instant.equals(this.initInstant)) {
       // wait condition:
       // 1. there is no inflight instant
       // 2. the inflight instant does not change
       // sleep for a while
       timeWait.waitFor();
       // refresh the inflight instant
-      instant = ackInstant();
+      instant = StreamerUtil.getLastPendingInstant(this.metaClient);
     }
     return instant;
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index c3fcec0..5ad2935 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -21,14 +21,11 @@ package org.apache.hudi.sink.common;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.sink.message.MessageBus;
-import org.apache.hudi.sink.message.MessageClient;
 import org.apache.hudi.sink.utils.TimeWait;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -42,14 +39,12 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.util.CollectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Base infrastructures for streaming writer function.
@@ -125,11 +120,6 @@ public abstract class AbstractStreamWriteFunction<I>
   private long checkpointId = -1;
 
   /**
-   * The message client.
-   */
-  private MessageClient messageClient;
-
-  /**
    * Constructs a StreamWriteFunctionBase.
    *
    * @param config The config options
@@ -150,6 +140,7 @@ public abstract class AbstractStreamWriteFunction<I>
             TypeInformation.of(WriteMetadataEvent.class)
         ));
 
+    this.currentInstant = lastPendingInstant();
     if (context.isRestored()) {
       restoreWriteMetadata();
     } else {
@@ -157,7 +148,6 @@ public abstract class AbstractStreamWriteFunction<I>
     }
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
-    this.messageClient = MessageBus.getClient(this.metaClient.getFs(), this.metaClient.getBasePath());
   }
 
   @Override
@@ -187,19 +177,14 @@ public abstract class AbstractStreamWriteFunction<I>
   // -------------------------------------------------------------------------
 
   private void restoreWriteMetadata() throws Exception {
-    List<WriteMetadataEvent> events = CollectionUtil.iterableToList(this.writeMetadataState.get());
+    String lastInflight = lastPendingInstant();
     boolean eventSent = false;
-    if (events.size() > 0) {
-      boolean committed = this.metaClient.getActiveTimeline()
-          .filterCompletedInstants()
-          .containsInstant(events.get(0).getInstantTime());
-      if (!committed) {
-        for (WriteMetadataEvent event : events) {
-          // The checkpoint succeed but the meta does not commit,
-          // re-commit the inflight instant
-          this.eventGateway.sendEventToCoordinator(event);
-          LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
-        }
+    for (WriteMetadataEvent event : this.writeMetadataState.get()) {
+      if (Objects.equals(lastInflight, event.getInstantTime())) {
+        // The checkpoint succeed but the meta does not commit,
+        // re-commit the inflight instant
+        this.eventGateway.sendEventToCoordinator(event);
+        LOG.info("Send uncommitted write metadata event to coordinator, task[{}].", taskID);
         eventSent = true;
       }
     }
@@ -237,65 +222,21 @@ public abstract class AbstractStreamWriteFunction<I>
     }
   }
 
-  @Override
-  public void close() {
-    if (this.messageClient != null) {
-      this.messageClient.close();
-    }
-  }
-
   /**
    * Returns the last pending instant time.
    */
-  private String lastPendingInstant() {
-    return StreamerUtil.getLastPendingInstant(metaClient);
-  }
-
-  /**
-   * Returns the previous committed checkpoint id.
-   *
-   * @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives
-   */
-  private long prevCkp(boolean eagerFlush) {
-    // Use the last checkpoint id to request for the message,
-    // the time sequence of committed checkpoints and ongoing
-    // checkpoints are as following:
-
-    // 0 ------------ 1 ------------ 2 ------------ 3 ------------>   committed ckp id
-    // |             /              /              /              /
-    // |--- ckp-1 ----|--- ckp-2 ----|--- ckp-3 ----|--- ckp-4 ----|  ongoing ckp id
-
-    // Use 0 as the initial committed checkpoint id, the 0th checkpoint message records the writing instant for ckp-1;
-    // when ckp-1 success event is received, commits a checkpoint message with the writing instant for ckp-2;
-    // that means, the checkpoint message records the writing instant of next checkpoint.
-    return Math.max(0, eagerFlush ? this.checkpointId : this.checkpointId - 1);
-  }
-
-  /**
-   * Returns the next instant to write from the message bus.
-   *
-   * <p>It returns 3 kinds of value:
-   * i) normal instant time: the previous checkpoint succeed;
-   * ii) 'aborted' instant time: the previous checkpoint has been aborted;
-   * ii) null: the checkpoint is till ongoing without any notifications.
-   */
-  @Nullable
-  protected String ackInstant(long checkpointId) {
-    Option<MessageBus.CkpMessage> ckpMessageOption = this.messageClient.getCkpMessage(checkpointId);
-    return ckpMessageOption.map(message -> message.inflightInstant).orElse(null);
+  protected String lastPendingInstant() {
+    return StreamerUtil.getLastPendingInstant(this.metaClient);
   }
 
   /**
    * Prepares the instant time to write with for next checkpoint.
    *
-   * @param eagerFlush Whether the data flush happens before the checkpoint barrier arrives
-   *
+   * @param hasData Whether the task has buffering data
    * @return The instant time
    */
-  protected String instantToWrite(boolean eagerFlush) {
-    final long ckpId = prevCkp(eagerFlush);
-    String instant = ackInstant(ckpId);
-
+  protected String instantToWrite(boolean hasData) {
+    String instant = lastPendingInstant();
     // if exactly-once semantics turns on,
     // waits for the checkpoint notification until the checkpoint timeout threshold hits.
     TimeWait timeWait = TimeWait.builder()
@@ -306,23 +247,18 @@ public abstract class AbstractStreamWriteFunction<I>
       // wait condition:
       // 1. there is no inflight instant
       // 2. the inflight instant does not change and the checkpoint has buffering data
-      if (instant == null) {
+      if (instant == null || (instant.equals(this.currentInstant) && hasData)) {
         // sleep for a while
         boolean timeout = timeWait.waitFor();
-        if (timeout && MessageBus.notInitialCkp(ckpId)) {
+        if (timeout && instant != null) {
           // if the timeout threshold hits but the last instant still not commit,
           // and the task does not receive commit ask event(no data or aborted checkpoint),
           // assumes the checkpoint was canceled silently and unblock the data flushing
           confirming = false;
-          instant = lastPendingInstant();
         } else {
           // refresh the inflight instant
-          instant = ackInstant(ckpId);
+          instant = lastPendingInstant();
         }
-      } else if (MessageBus.canAbort(instant, ckpId)) {
-        // the checkpoint was canceled, reuse the last instant
-        confirming = false;
-        instant = lastPendingInstant();
       } else {
         // the pending instant changed, that means the last instant was committed
         // successfully.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
deleted file mode 100644
index ff8f3eb..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageBus.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.util.StreamerUtil;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-/**
- * A message bus for transferring the checkpoint messages.
- *
- * <p>Each time the driver starts a new instant, it writes a commit message into the bus, the write tasks
- * then consume the message and unblocking the data flush.
- *
- * <p>Why we use the DFS based message queue instead of sending
- * the {@link org.apache.flink.runtime.operators.coordination.OperatorEvent} ?
- * The write task handles the operator event using the main mailbox executor which has the lowest priority for mails,
- * it is also used to process the inputs. When the write task blocks and waits for the operator event to ack the valid instant to write,
- * it actually blocks all the following events in the mailbox, the operator event can never be consumed then it causes deadlock.
- *
- * <p>The message bus is also more lightweight than the active timeline.
- */
-public abstract class MessageBus implements AutoCloseable {
-
-  public static final long INITIAL_CKP_ID = 0L;
-
-  public static final String ABORTED_CKP_INSTANT = "aborted";
-
-  protected static final int MESSAGE_QUEUE_LENGTH = 20;
-
-  protected static final int CLIENT_MESSAGE_CACHE_SIZE = 10;
-
-  private static final String MESSAGE_BUS = "message_bus";
-
-  private static final String COMMIT = "commit";
-
-  private static final String COMMIT_EXTENSION = "." + COMMIT;
-  private static final String ABORTED_EXTENSION = ".aborted";
-
-  protected final FileSystem fs;
-  protected final String basePath;
-  protected final String messageBusPath;
-
-  protected MessageBus(FileSystem fs, String basePath) {
-    this.fs = fs;
-    this.basePath = basePath;
-    this.messageBusPath = messageBusPath(basePath);
-  }
-
-  public static MessageDriver getDriver(FileSystem fs, String basePath) {
-    return MessageDriver.getInstance(fs, basePath);
-  }
-
-  public static MessageClient getClient(FileSystem fs, String basePath) {
-    return MessageClient.getSingleton(fs, basePath);
-  }
-
-  public static MessageClient getClient(String basePath) {
-    FileSystem fs = FSUtils.getFs(basePath, StreamerUtil.getHadoopConf());
-    return MessageClient.getSingleton(fs, basePath);
-  }
-
-  // -------------------------------------------------------------------------
-  //  Utilities
-  // -------------------------------------------------------------------------
-  public static boolean canAbort(String instant, long checkpointId) {
-    return ABORTED_CKP_INSTANT.equals(instant) && MessageBus.notInitialCkp(checkpointId);
-  }
-
-  public static boolean notInitialCkp(long checkpointId) {
-    return checkpointId != INITIAL_CKP_ID;
-  }
-
-  protected Path fullFilePath(String fileName) {
-    return new Path(messageBusPath, fileName);
-  }
-
-  protected static String messageBusPath(String basePath) {
-    return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + MESSAGE_BUS;
-  }
-
-  protected static String getCommitFileName(long checkpointId) {
-    return checkpointId + COMMIT_EXTENSION;
-  }
-
-  protected static String getAbortedFileName(long checkpointId) {
-    return checkpointId + ABORTED_EXTENSION;
-  }
-
-  // -------------------------------------------------------------------------
-  //  Inner Class
-  // -------------------------------------------------------------------------
-
-  /**
-   * A checkpoint message.
-   */
-  public static class CkpMessage {
-    private static final String SEPARATOR = ",";
-
-    public final boolean committed; // whether the checkpoint is committed
-
-    public final long checkpointId;
-    public final String commitInstant;
-    public final String inflightInstant;
-
-    private CkpMessage(long checkpointId, String commitInstant, String inflightInstant) {
-      this.committed = true;
-      this.checkpointId = checkpointId;
-      this.commitInstant = commitInstant;
-      this.inflightInstant = inflightInstant;
-    }
-
-    private CkpMessage(long checkpointId) {
-      this.committed = false;
-      this.checkpointId = checkpointId;
-      this.commitInstant = ABORTED_CKP_INSTANT;
-      this.inflightInstant = ABORTED_CKP_INSTANT;
-    }
-
-    /**
-     * Encodes the instants as 'commitInstant,inflightInstant'.
-     */
-    public static byte[] toBytes(String commitInstant, String inflightInstant) {
-      return (commitInstant + SEPARATOR + inflightInstant).getBytes(StandardCharsets.UTF_8);
-    }
-
-    public static CkpMessage fromBytes(long checkpointId, byte[] bytes) {
-      String content = new String(bytes, StandardCharsets.UTF_8);
-      String[] splits = content.split(SEPARATOR);
-      return new CkpMessage(checkpointId, splits[0], splits[1]);
-    }
-
-    public static CkpMessage fromPath(FileSystem fs, Path path) throws IOException {
-      final String[] splits = path.getName().split("\\.");
-      ValidationUtils.checkState(splits.length == 2, "Invalid checkpoint message file name: " + path.getName());
-      final long checkpointId = Long.parseLong(splits[0]);
-      final String suffix = splits[1];
-      if (suffix.equals(COMMIT)) {
-        try (FSDataInputStream is = fs.open(path)) {
-          byte[] bytes = FileIOUtils.readAsByteArray(is);
-          return CkpMessage.fromBytes(checkpointId, bytes);
-        }
-      } else {
-        return new CkpMessage(checkpointId);
-      }
-    }
-  }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
deleted file mode 100644
index ea893d5..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageClient.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A client that consumes messages from the {@link MessageBus}.
- */
-public class MessageClient extends MessageBus {
-  private static final Logger LOG = LoggerFactory.getLogger(MessageClient.class);
-
-  private static final Map<String, MessageClient> CLIENTS = new HashMap<>();
-
-  private final TreeMap<Long, CkpMessage> ckpCache; // checkpoint id -> CkpMessage mapping
-
-  private MessageClient(FileSystem fs, String basePath) throws IOException {
-    super(fs, basePath);
-    this.ckpCache = new TreeMap<>();
-  }
-
-  /**
-   * Returns the message bus instance.
-   *
-   * <p>This expects to be called by the client.
-   *
-   * @param fs       The filesystem
-   * @param basePath The table base path
-   * @return The instance of message bus
-   */
-  private static MessageClient getInstance(FileSystem fs, String basePath) {
-    try {
-      return new MessageClient(fs, basePath);
-    } catch (IOException e) {
-      throw new HoodieException("Initialize checkpoint message bus error", e);
-    }
-  }
-
-  /**
-   * Returns the singleton message bus instance.
-   *
-   * <p>This expects to be called by the client.
-   *
-   * @param fs       The filesystem
-   * @param basePath The table base path
-   * @return The instance of message bus
-   */
-  public static synchronized MessageClient getSingleton(FileSystem fs, String basePath) {
-    return CLIENTS.computeIfAbsent(basePath,
-        k -> getInstance(fs, basePath));
-  }
-
-  public synchronized Option<CkpMessage> getCkpMessage(long checkpointId) {
-    if (this.ckpCache.size() >= CLIENT_MESSAGE_CACHE_SIZE) {
-      this.ckpCache.pollFirstEntry();
-    }
-    if (this.ckpCache.containsKey(checkpointId)) {
-      return Option.of(this.ckpCache.get(checkpointId));
-    }
-    final Path commitFilePath = fullFilePath(getCommitFileName(checkpointId));
-    try {
-      if (fs.exists(commitFilePath)) {
-        CkpMessage ckpMessage = CkpMessage.fromPath(fs, commitFilePath);
-        this.ckpCache.put(checkpointId, ckpMessage);
-        return Option.of(ckpMessage);
-      }
-    } catch (Throwable e) {
-      // ignored
-      LOG.warn("Read committed checkpoint message error: " + checkpointId, e);
-      return Option.empty();
-    }
-    final Path abortedFilePath = fullFilePath(getAbortedFileName(checkpointId));
-    try {
-      if (fs.exists(abortedFilePath)) {
-        CkpMessage ckpMessage = CkpMessage.fromPath(fs, abortedFilePath);
-        this.ckpCache.put(checkpointId, ckpMessage);
-        return Option.of(ckpMessage);
-      }
-    } catch (Throwable e) {
-      // ignored
-      LOG.warn("Read aborted checkpoint message error: " + checkpointId, e);
-      return Option.empty();
-    }
-    return Option.empty();
-  }
-
-  @VisibleForTesting
-  public TreeMap<Long, CkpMessage> getCkpCache() {
-    return ckpCache;
-  }
-
-  @Override
-  public void close() {
-    synchronized (this) {
-      this.ckpCache.clear();
-    }
-  }
-}
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java b/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
deleted file mode 100644
index bf98209..0000000
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/message/MessageDriver.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.exception.HoodieException;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A driver that produces messages to the {@link MessageBus}.
- */
-public class MessageDriver extends MessageBus {
-  private final TreeMap<Long, Boolean> ckpIdCache; // checkpoint id -> isCommitted mapping
-
-  public MessageDriver(FileSystem fs, String basePath) throws IOException {
-    super(fs, basePath);
-    this.ckpIdCache = new TreeMap<>();
-    initialize();
-  }
-
-  /**
-   * Returns the message bus instance.
-   *
-   * <p>This expects to be called by the driver.
-   *
-   * @param fs       The filesystem
-   * @param basePath The table base path
-   * @return The instance of message bus
-   */
-  public static MessageDriver getInstance(FileSystem fs, String basePath) {
-    try {
-      return new MessageDriver(fs, basePath);
-    } catch (IOException e) {
-      throw new HoodieException("Initialize checkpoint message bus error", e);
-    }
-  }
-
-  /**
-   * Initialize the message bus, would clean all the messages.
-   *
-   * <p>This expects to be called by the driver.
-   */
-  private void initialize() throws IOException {
-    Path path = new Path(messageBusPath(basePath));
-    if (fs.exists(path)) {
-      fs.delete(path, true);
-    }
-    fs.mkdirs(path);
-  }
-
-  /**
-   * Add a checkpoint commit message.
-   *
-   * @param checkpointId    The checkpoint id
-   * @param commitInstant   The committed instant
-   * @param inflightInstant The new inflight instant
-   */
-  public void commitCkp(long checkpointId, String commitInstant, String inflightInstant) {
-    Path path = fullFilePath(getCommitFileName(checkpointId));
-
-    try (FSDataOutputStream outputStream = fs.create(path, true)) {
-      byte[] bytes = CkpMessage.toBytes(commitInstant, inflightInstant);
-      outputStream.write(bytes);
-      outputStream.close();
-      this.ckpIdCache.put(checkpointId, true);
-      clean();
-    } catch (Throwable e) {
-      throw new HoodieException("Adding committed message error for checkpoint: " + checkpointId, e);
-    }
-  }
-
-  /**
-   * Add an aborted checkpoint message.
-   *
-   * @param checkpointId    The checkpoint id
-   */
-  public void abortCkp(long checkpointId) {
-    Path path = fullFilePath(getAbortedFileName(checkpointId));
-    try {
-      fs.createNewFile(path);
-      this.ckpIdCache.put(checkpointId, false);
-      clean();
-    } catch (Throwable e) {
-      throw new HoodieException("Adding aborted message error for checkpoint: " + checkpointId, e);
-    }
-  }
-
-  private void clean() throws IOException {
-    int numToClean = this.ckpIdCache.size() - MESSAGE_QUEUE_LENGTH;
-    if (numToClean >= 10) {
-      for (int i = 0; i < numToClean; i++) {
-        Map.Entry<Long, Boolean> entry = this.ckpIdCache.pollFirstEntry();
-        final String fileName = entry.getValue() ? getCommitFileName(entry.getKey()) : getAbortedFileName(entry.getKey());
-        final Path filePath = fullFilePath(fileName);
-        fs.delete(filePath, false);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public TreeMap<Long, Boolean> getCkpIdCache() {
-    return ckpIdCache;
-  }
-
-  @Override
-  public void close() throws Exception {
-    this.ckpIdCache.clear();
-  }
-}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java b/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
deleted file mode 100644
index b161c96..0000000
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/message/TestMessageBus.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.sink.message;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.utils.TestConfigurations;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.IntStream;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Test cases for {@link MessageBus}.
- */
-public class TestMessageBus {
-
-  private String basePath;
-  private FileSystem fs;
-
-  private MessageDriver driver;
-
-  @TempDir
-  File tempFile;
-
-  @BeforeEach
-  public void beforeEach() throws Exception {
-    basePath = tempFile.getAbsolutePath();
-    this.fs = FSUtils.getFs(tempFile.getAbsolutePath(), StreamerUtil.getHadoopConf());
-
-    Configuration conf = TestConfigurations.getDefaultConf(basePath);
-    StreamerUtil.initTableIfNotExists(conf);
-
-    this.driver = MessageDriver.getInstance(fs, basePath);
-  }
-
-  @Test
-  void testWriteAndReadMessage() {
-    MessageClient client = MessageClient.getSingleton(fs, basePath);
-
-    // write and read 5 committed checkpoints
-    IntStream.range(0, 5).forEach(i -> driver.commitCkp(i, i + "", i + 1 + ""));
-
-    IntStream.range(0, 5).forEach(i -> {
-      Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
-      assertTrue(messageOpt.isPresent());
-
-      MessageBus.CkpMessage ckpMessage = messageOpt.get();
-      assertTrue(ckpMessage.committed);
-      assertThat(ckpMessage.commitInstant, is(i + ""));
-      assertThat(ckpMessage.inflightInstant, is(i + 1 + ""));
-    });
-
-    // write and read 5 aborted checkpoints
-    IntStream.range(5, 10).forEach(i -> driver.abortCkp(i));
-
-    IntStream.range(5, 10).forEach(i -> {
-      Option<MessageBus.CkpMessage> messageOpt = client.getCkpMessage(i);
-      assertTrue(messageOpt.isPresent());
-
-      MessageBus.CkpMessage ckpMessage = messageOpt.get();
-      assertFalse(ckpMessage.committed);
-      assertThat(ckpMessage.commitInstant, is(MessageBus.ABORTED_CKP_INSTANT));
-      assertThat(ckpMessage.inflightInstant, is(MessageBus.ABORTED_CKP_INSTANT));
-    });
-  }
-
-  @Test
-  void testWriteCleaning() {
-    // write and read 20 committed checkpoints
-    IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + ""));
-    assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(20));
-
-    // write and read 10 aborted checkpoints
-    IntStream.range(20, 29).forEach(i -> driver.abortCkp(i));
-    assertThat("The id cache should not be cleaned", driver.getCkpIdCache().size(), is(29));
-
-    driver.commitCkp(29, "29", "30");
-    assertThat("The cache should be cleaned", driver.getCkpIdCache().size(), is(20));
-    assertThat(longSet2String(driver.getCkpIdCache().keySet()),
-        is("10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29"));
-  }
-
-  @Test
-  void testReadCleaning() {
-    MessageClient client = MessageClient.getSingleton(fs, basePath);
-
-    // write and read 20 committed checkpoints
-    IntStream.range(0, 20).forEach(i -> driver.commitCkp(i, i + "", i + 1 + ""));
-
-    IntStream.range(0, 10).forEach(client::getCkpMessage);
-    assertThat("The checkpoint cache should not be cleaned", client.getCkpCache().size(), is(10));
-
-    client.getCkpMessage(10);
-    assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10));
-
-    IntStream.range(11, 15).forEach(client::getCkpMessage);
-    assertThat("The checkpoint cache should be cleaned", client.getCkpCache().size(), is(10));
-    assertThat(longSet2String(client.getCkpCache().keySet()), is("5,6,7,8,9,10,11,12,13,14"));
-  }
-
-  private static String longSet2String(Set<Long> longSet) {
-    List<String> elements = new ArrayList<>();
-    longSet.stream().mapToInt(Long::intValue).forEach(i -> elements.add(i + ""));
-    return String.join(",", elements);
-  }
-}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index f1f5a1f..54a142a 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -248,7 +248,6 @@ public class StreamWriteFunctionWrapper<I> implements TestFunctionWrapper<I> {
   public void close() throws Exception {
     coordinator.close();
     ioManager.close();
-    writeFunction.close();
   }
 
   public StreamWriteOperatorCoordinator getCoordinator() {