You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2021/10/10 01:30:59 UTC

[hudi] branch master updated: [HUDI-2537] Fix metadata table for flink (#3774)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ad63938  [HUDI-2537] Fix metadata table for flink (#3774)
ad63938 is described below

commit ad63938890151c4c13e78acfd448259c3af80f80
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sun Oct 10 09:30:39 2021 +0800

    [HUDI-2537] Fix metadata table for flink (#3774)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  41 +++++++-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  55 +++++++---
 .../FlinkHoodieBackedTableMetadataWriter.java      | 112 ++++++++-------------
 .../org/apache/hudi/table/HoodieFlinkTable.java    |  38 +++++++
 .../SparkHoodieBackedTableMetadataWriter.java      |  40 --------
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  15 +--
 .../hudi/sink/append/AppendWriteFunction.java      |   7 +-
 .../java/org/apache/hudi/source/FileIndex.java     |  13 ++-
 .../sink/TestStreamWriteOperatorCoordinator.java   |  14 ++-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   3 -
 .../java/org/apache/hudi/source/TestFileIndex.java |   4 +-
 packaging/hudi-flink-bundle/pom.xml                |  43 ++++++++
 12 files changed, 232 insertions(+), 153 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 19e9d31..ceac9eb 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.AbstractHoodieWriteClient;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -219,7 +220,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    */
   protected abstract void initialize(HoodieEngineContext engineContext);
 
-  protected void initTableMetadata() {
+  public void initTableMetadata() {
     try {
       if (this.metadata != null) {
         this.metadata.close();
@@ -533,4 +534,42 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
    * @param instantTime The timestamp to use for the deltacommit.
    */
   protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);
+
+  /**
+   *  Perform a compaction on the Metadata Table.
+   *
+   * Cases to be handled:
+   *   1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
+   *      a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
+   *
+   *   2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
+   *      deltacommit.
+   */
+  protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) {
+    String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+        .get().getTimestamp();
+    List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+    if (!pendingInstants.isEmpty()) {
+      LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
+          pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
+      return;
+    }
+
+    // Trigger compaction with suffixes based on the same instant time. This ensures that any future
+    // delta commits synced over will not have an instant time lesser than the last completed instant on the
+    // metadata table.
+    final String compactionInstantTime = latestDeltacommitTime + "001";
+    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
+      writeClient.compact(compactionInstantTime);
+    }
+  }
+
+  protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime) {
+    // Trigger cleaning with suffixes based on the same instant time. This ensures that any future
+    // delta commits synced over will not have an instant time lesser than the last completed instant on the
+    // metadata table.
+    writeClient.clean(instantTime + "002");
+  }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 669be16..e95b0f8 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
@@ -51,6 +50,8 @@ import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
 import org.apache.hudi.io.FlinkMergeHandle;
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.MiniBatchHandle;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -86,24 +87,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
    * FileID to write handle mapping in order to record the write handles for each file group,
    * so that we can append the mini-batch data buffer incrementally.
    */
-  private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
+  private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
 
-  public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
-    this(context, clientConfig, false);
-  }
+  /**
+   * Cached metadata writer for coordinator to reuse for each commit.
+   */
+  private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty();
 
-  @Deprecated
-  public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
+  public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
     super(context, writeConfig);
     this.bucketToHandles = new HashMap<>();
   }
 
-  @Deprecated
-  public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
-                                Option<EmbeddedTimelineService> timelineService) {
-    super(context, writeConfig, timelineService);
-  }
-
   /**
    * Complete changes performed at the given instantTime marker with specified action.
    */
@@ -260,6 +255,24 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     // remove the async cleaning
   }
 
+  @Override
+  protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
+    this.metadataWriterOption.ifPresent(w -> {
+      w.initTableMetadata(); // refresh the timeline
+      w.update(metadata, instantTime);
+    });
+  }
+
+  /**
+   * Initialize the table metadata writer, for e.g, bootstrap the metadata table
+   * from the filesystem if it does not exist.
+   */
+  public void initMetadataWriter() {
+    HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
+        FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
+    this.metadataWriterOption = Option.of(metadataWriter);
+  }
+
   /**
    * Starts async cleaning service for finished commits.
    *
@@ -347,6 +360,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
       String compactionCommitTime) {
     this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
     List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
+    writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
+    // commit to data table after committing to metadata table.
     finalizeWrite(table, compactionCommitTime, writeStats);
     LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
     FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
@@ -381,6 +396,19 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     throw new HoodieNotSupportedException("Clustering is not supported yet");
   }
 
+  private void writeTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
+                                  HoodieCommitMetadata commitMetadata,
+                                  HoodieInstant hoodieInstant) {
+    try {
+      this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
+      // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
+      // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
+      table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
+    } finally {
+      this.txnManager.endTransaction();
+    }
+  }
+
   @Override
   protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
@@ -478,6 +506,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     } else {
       writeTimer = metrics.getDeltaCommitCtx();
     }
+    table.getHoodieView().sync();
     return table;
   }
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 18a1960..634eaba 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -20,24 +20,17 @@ package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
-import org.apache.hudi.table.HoodieFlinkTable;
-import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
@@ -86,82 +79,61 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
-    List<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+    List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);
 
-    try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) {
-      writeClient.startCommitWithTime(instantTime);
-      writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
+    try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
+      if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
+        // if this is a new commit being applied to metadata for the first time
+        writeClient.startCommitWithTime(instantTime);
+        writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
+      } else {
+        // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
+        // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will be created.
+        // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually remove the completed instant and proceed.
+        // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
+        HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
+        HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
+        metadataMetaClient.reloadActiveTimeline();
+      }
 
-      List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime);
+      List<WriteStatus> statuses = records.size() > 0
+          ? writeClient.upsertPreppedRecords(recordList, instantTime)
+          : Collections.emptyList();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
           throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
         }
       });
+      // flink does not support auto-commit yet, also the auto commit logic is not complete as AbstractHoodieWriteClient now.
       writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
-      // trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
-      // delta commits synced over will not have an instant time lesser than the last completed instant on the
-      // metadata table.
-      if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
-        writeClient.compact(instantTime + "001");
-      }
-      writeClient.clean(instantTime + "002");
+
+      // reload timeline
+      metadataMetaClient.reloadActiveTimeline();
+      compactIfNecessary(writeClient, instantTime);
+      doClean(writeClient, instantTime);
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> {
-      try {
-        m.updateSizeMetrics(metadataMetaClient, metadata);
-      } catch (HoodieIOException e) {
-        LOG.error("Could not publish metadata size metrics", e);
-      }
-    });
+    metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
   }
 
   /**
-   * Tag each record with the location.
-   * <p>
-   * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
-   * base file.
+   * Tag each record with the location in the given partition.
+   *
+   * The record is tagged with respective file slice's location based on its record key.
    */
-  private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
-    HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext);
-    TableFileSystemView.SliceView fsView = table.getSliceView();
-    List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
-        .map(FileSlice::getBaseFile)
-        .filter(Option::isPresent)
-        .map(Option::get)
-        .collect(Collectors.toList());
-
-    // All the metadata fits within a single base file
-    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
-      if (baseFiles.size() > 1) {
-        throw new HoodieMetadataException("Multiple base files found in metadata partition");
-      }
-    }
-
-    String fileId;
-    String instantTime;
-    if (!baseFiles.isEmpty()) {
-      fileId = baseFiles.get(0).getFileId();
-      instantTime = "U";
-    } else {
-      // If there is a log file then we can assume that it has the data
-      List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
-          .map(FileSlice::getLatestLogFile)
-          .filter(Option::isPresent)
-          .map(Option::get)
-          .collect(Collectors.toList());
-      if (logFiles.isEmpty()) {
-        // No base and log files. All are new inserts
-        fileId = FSUtils.createNewFileIdPfx();
-        instantTime = "I";
-      } else {
-        fileId = logFiles.get(0).getFileId();
-        instantTime = "U";
-      }
-    }
-
-    return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList());
+  private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
+    List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
+    ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));
+
+    return records.stream().map(r -> {
+      FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
+      final String instantTime = slice.isEmpty() ? "I" : "U";
+      r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId()));
+      return r;
+    }).collect(Collectors.toList());
   }
 }
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 3e26025..ce63a2d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -29,14 +29,25 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.FlinkHoodieIndex;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
 import java.util.List;
 
 public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
     extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
     implements ExplicitWriteHandleTable<T> {
+
+  private boolean isMetadataAvailabilityUpdated = false;
+  private boolean isMetadataTableAvailable;
+
   protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     super(config, context, metaClient);
   }
@@ -66,4 +77,31 @@ public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
   protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
     return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataAvailabilityUpdated) {
+        // this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
+        // this is done to avoid repeated calls to fs.exists().
+        try {
+          isMetadataTableAvailable = config.isMetadataTableEnabled()
+              && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
+        } catch (IOException e) {
+          throw new HoodieMetadataException("Checking existence of metadata table failed", e);
+        }
+        isMetadataAvailabilityUpdated = true;
+      }
+    }
+    if (isMetadataTableAvailable) {
+      return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
+    } else {
+      return Option.empty();
+    }
+  }
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index f512b8f..3324455 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -41,9 +41,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
-import java.util.stream.Collectors;
 
 public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {
 
@@ -130,44 +128,6 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
   }
 
   /**
-   *  Perform a compaction on the Metadata Table.
-   *
-   * Cases to be handled:
-   *   1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
-   *      a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
-   *
-   *   2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
-   *      deltacommit.
-   */
-  private void compactIfNecessary(SparkRDDWriteClient writeClient, String instantTime) {
-    String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
-        .get().getTimestamp();
-    List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
-        .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
-
-    if (!pendingInstants.isEmpty()) {
-      LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
-          pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
-      return;
-    }
-
-    // Trigger compaction with suffixes based on the same instant time. This ensures that any future
-    // delta commits synced over will not have an instant time lesser than the last completed instant on the
-    // metadata table.
-    final String compactionInstantTime = latestDeltacommitTime + "001";
-    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
-      writeClient.compact(compactionInstantTime);
-    }
-  }
-
-  private void doClean(SparkRDDWriteClient writeClient, String instantTime) {
-    // Trigger cleaning with suffixes based on the same instant time. This ensures that any future
-    // delta commits synced over will not have an instant time lesser than the last completed instant on the
-    // metadata table.
-    writeClient.clean(instantTime + "002");
-  }
-
-  /**
    * Tag each record with the location in the given partition.
    *
    * The record is tagged with respective file slice's location based on its record key.
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 26c1595..f2844a6 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -127,11 +127,6 @@ public class StreamWriteOperatorCoordinator
   private HiveSyncContext hiveSyncContext;
 
   /**
-   * A single-thread executor to handle metadata table sync.
-   */
-  private NonThrownExecutor metadataSyncExecutor;
-
-  /**
    * The table state.
    */
   private transient TableState tableState;
@@ -294,7 +289,7 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void initMetadataSync() {
-    this.metadataSyncExecutor = new NonThrownExecutor(LOG, true);
+    this.writeClient.initMetadataWriter();
   }
 
   private void reset() {
@@ -498,14 +493,6 @@ public class StreamWriteOperatorCoordinator
     this.executor = executor;
   }
 
-  @VisibleForTesting
-  public void setMetadataSyncExecutor(NonThrownExecutor executor) throws Exception {
-    if (this.metadataSyncExecutor != null) {
-      this.metadataSyncExecutor.close();
-    }
-    this.metadataSyncExecutor = executor;
-  }
-
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 128c030..0279313 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -72,8 +72,6 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     // it would check the validity.
     // wait for the buffer data flush out and request a new instant
     flushData(false);
-    // nullify the write helper for next ckp
-    this.writerHelper = null;
   }
 
   @Override
@@ -133,5 +131,10 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
         .endInput(endInput)
         .build();
     this.eventGateway.sendEventToCoordinator(event);
+    // nullify the write helper for next ckp
+    this.writerHelper = null;
+    this.writeStatuses.addAll(writeStatus);
+    // blocks flushing until the coordinator starts a new instant
+    this.confirming = true;
   }
 }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 54085eb..07383ef 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -48,10 +49,12 @@ public class FileIndex {
   private final Path path;
   private final HoodieMetadataConfig metadataConfig;
   private List<String> partitionPaths; // cache of partition paths
+  private final boolean tableExists;
 
   private FileIndex(Path path, Configuration conf) {
     this.path = path;
     this.metadataConfig = metadataConfig(conf);
+    this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf());
   }
 
   public static FileIndex instance(Path path, Configuration conf) {
@@ -111,6 +114,9 @@ public class FileIndex {
    * Returns all the file statuses under the table base path.
    */
   public FileStatus[] getFilesInPartitions() {
+    if (!tableExists) {
+      return new FileStatus[0];
+    }
     String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
     return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
             partitions, "/tmp/")
@@ -165,8 +171,9 @@ public class FileIndex {
     if (this.partitionPaths != null) {
       return this.partitionPaths;
     }
-    this.partitionPaths = FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT,
-        metadataConfig, path.toString());
+    this.partitionPaths = this.tableExists
+        ? FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString())
+        : Collections.emptyList();
     return this.partitionPaths;
   }
 
@@ -174,7 +181,7 @@ public class FileIndex {
     Properties properties = new Properties();
 
     // set up metadata.enabled=true in table DDL to enable metadata listing
-    properties.put(HoodieMetadataConfig.ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+    properties.put(HoodieMetadataConfig.ENABLE.key(), conf.getBoolean(FlinkOptions.METADATA_ENABLED));
 
     return HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
   }
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0538714..1fdb5ca 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -181,7 +180,7 @@ public class TestStreamWriteOperatorCoordinator {
     assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1));
   }
 
-  @Disabled
+  @Test
   void testSyncMetadataTable() throws Exception {
     // reset
     reset();
@@ -193,7 +192,6 @@ public class TestStreamWriteOperatorCoordinator {
     coordinator = new StreamWriteOperatorCoordinator(conf, context);
     coordinator.start();
     coordinator.setExecutor(new MockCoordinatorExecutor(context));
-    coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(context));
 
     final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
 
@@ -209,7 +207,7 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000"));
 
     // test metadata table compaction
-    // write another 4 commits
+    // write another 3 commits
     for (int i = 1; i < 4; i++) {
       instant = mockWriteWithMetadata();
       metadataTableMetaClient.reloadActiveTimeline();
@@ -247,7 +245,13 @@ public class TestStreamWriteOperatorCoordinator {
       double failureFraction) {
     final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction);
     writeStatus.setPartitionPath(partitionPath);
-    writeStatus.setStat(new HoodieWriteStat());
+
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setPartitionPath(partitionPath);
+    writeStat.setFileId("fileId123");
+    writeStat.setPath("path123");
+
+    writeStatus.setStat(writeStat);
 
     return WriteMetadataEvent.builder()
         .taskID(taskId)
diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index c5d3ec5..6b6bede 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -142,9 +142,6 @@ public class StreamWriteFunctionWrapper<I> {
   public void openFunction() throws Exception {
     this.coordinator.start();
     this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext));
-    if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
-      this.coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(coordinatorContext));
-    }
     toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
     toHoodieFunction.setRuntimeContext(runtimeContext);
     toHoodieFunction.open(conf);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index 060974d..334df59 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -27,7 +27,6 @@ import org.apache.hudi.utils.TestData;
 import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -90,7 +89,8 @@ public class TestFileIndex {
     assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
   }
 
-  @Disabled
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
   void testFileListingEmptyTable(boolean enableMetadata) {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata);
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index 380199e..120cba3 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -144,6 +144,10 @@
                   <include>org.apache.flink:flink-sql-connector-hive-2.3.6_${scala.binary.version}</include>
 
                   <include>org.apache.hbase:hbase-common</include>
+                  <include>org.apache.hbase:hbase-client</include>
+                  <include>org.apache.hbase:hbase-server</include>
+                  <include>org.apache.hbase:hbase-protocol</include>
+                  <include>org.apache.htrace:htrace-core</include>
                   <include>commons-codec:commons-codec</include>
                 </includes>
               </artifactSet>
@@ -594,6 +598,45 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>javax.servlet</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-protocol</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.htrace</groupId>
+      <artifactId>htrace-core</artifactId>
+      <version>${htrace.version}</version>
+    </dependency>
   </dependencies>
 
   <profiles>