You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:12 UTC

[hudi] 02/45: [MINOR] Add Zhiyan metrics reporter

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 23412b2bee1e05231f100f922fe3785c39f2891b
Author: simonssu <si...@tencent.com>
AuthorDate: Wed May 25 21:08:45 2022 +0800

    [MINOR] Add Zhiyan metrics reporter
---
 dev/tencent-release.sh                             |   4 +-
 hudi-client/hudi-client-common/pom.xml             |   7 +
 .../apache/hudi/async/AsyncPostEventService.java   |  93 +++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  84 ++++++++++
 .../hudi/config/metrics/HoodieMetricsConfig.java   |  23 ++-
 .../config/metrics/HoodieMetricsZhiyanConfig.java  | 143 +++++++++++++++++
 .../org/apache/hudi/metrics/HoodieMetrics.java     | 120 ++++++++++++++-
 .../main/java/org/apache/hudi/metrics/Metrics.java |   1 +
 .../hudi/metrics/MetricsReporterFactory.java       |   4 +
 .../apache/hudi/metrics/MetricsReporterType.java   |   2 +-
 .../hudi/metrics/zhiyan/ZhiyanHttpClient.java      | 129 ++++++++++++++++
 .../hudi/metrics/zhiyan/ZhiyanMetricsReporter.java |  66 ++++++++
 .../apache/hudi/metrics/zhiyan/ZhiyanReporter.java | 170 +++++++++++++++++++++
 .../java/org/apache/hudi/tdbank/TDBankClient.java  | 103 +++++++++++++
 .../java/org/apache/hudi/tdbank/TdbankConfig.java  |  82 ++++++++++
 .../hudi/tdbank/TdbankHoodieMetricsEvent.java      | 110 +++++++++++++
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   6 +
 .../hudi/common/table/HoodieTableConfig.java       |   6 +-
 .../apache/hudi/configuration/FlinkOptions.java    |  40 +++--
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |   4 +
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  |   4 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |   2 +
 .../main/java/org/apache/hudi/DataSourceUtils.java |  12 +-
 .../scala/org/apache/hudi/HoodieCLIUtils.scala     |   2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  12 +-
 .../AlterHoodieTableAddColumnsCommand.scala        |   1 +
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   3 +-
 .../java/org/apache/hudi/TestDataSourceUtils.java  |   2 +-
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |   5 +-
 .../org/apache/hudi/internal/DefaultSource.java    |   1 +
 .../apache/hudi/spark3/internal/DefaultSource.java |   4 +-
 .../hudi/command/Spark31AlterTableCommand.scala    |   2 +-
 32 files changed, 1200 insertions(+), 47 deletions(-)

diff --git a/dev/tencent-release.sh b/dev/tencent-release.sh
index 944f497070..b788d62dc7 100644
--- a/dev/tencent-release.sh
+++ b/dev/tencent-release.sh
@@ -116,9 +116,9 @@ function deploy_spark(){
   FLINK_VERSION=$3
 
   if [ ${release_repo} = "Y" ]; then
-    COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30"
+    COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30 -T 2.5C"
   else
-    COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30"
+    COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30 -T 2.5C"
   fi
 
 #  INSTALL_OPTIONS="-U -Drat.skip=true -Djacoco.skip=true -Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -DskipTests -s dev/settings.xml -T 2.5C"
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index 735b62957d..81bf645427 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -72,6 +72,13 @@
       <version>0.2.2</version>
     </dependency>
 
+    <!-- Tdbank -->
+    <dependency>
+      <groupId>com.tencent.tdbank</groupId>
+      <artifactId>TDBusSDK</artifactId>
+      <version>1.2.9</version>
+    </dependency>
+
     <!-- Dropwizard Metrics -->
     <dependency>
       <groupId>io.dropwizard.metrics</groupId>
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java
new file mode 100644
index 0000000000..84cf82c913
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.tdbank.TDBankClient;
+import org.apache.hudi.tdbank.TdbankHoodieMetricsEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Async service to post event to remote service..
+ */
+public class AsyncPostEventService extends HoodieAsyncService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncPostEventService.class);
+
+  private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
+  private final LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue;
+  private final TDBankClient client;
+
+  public AsyncPostEventService(HoodieWriteConfig config, LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue) {
+    this.client = new TDBankClient(config.getTdbankTdmAddr(),
+      config.getTdbankTdmPort(), config.getTdbankBid());
+    this.queue = queue;
+  }
+
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {
+    LOG.info("Start async post event service...");
+    return Pair.of(CompletableFuture.supplyAsync(() -> {
+      sendEvent();
+      return true;
+    }, executor), executor);
+  }
+
+  private void sendEvent() {
+    try {
+      while (!isShutdownRequested()) {
+        TdbankHoodieMetricsEvent event = queue.poll(10, TimeUnit.SECONDS);
+        if (event != null) {
+          client.sendMessage(event);
+        }
+      }
+      LOG.info("Post event service shutdown properly.");
+    } catch (Exception e) {
+      LOG.error("Error when send event to tdbank", e);
+    }
+  }
+
+  // TODO simplfy codes here among async package.
+  public static void waitForCompletion(AsyncArchiveService asyncArchiveService) {
+    if (asyncArchiveService != null) {
+      LOG.info("Waiting for async archive service to finish");
+      try {
+        asyncArchiveService.waitForShutdown();
+      } catch (Exception e) {
+        throw new HoodieException("Error waiting for async archive service to finish", e);
+      }
+    }
+  }
+
+  public static void forceShutdown(AsyncArchiveService asyncArchiveService) {
+    if (asyncArchiveService != null) {
+      LOG.info("Shutting down async archive service...");
+      asyncArchiveService.shutdown(true);
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 55979b481b..23bc0ee329 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,6 +54,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsZhiyanConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.index.HoodieIndex;
@@ -72,6 +73,7 @@ import org.apache.hudi.table.storage.HoodieStorageLayout;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.hudi.tdbank.TdbankConfig;
 import org.apache.orc.CompressionKind;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -86,6 +88,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -108,11 +111,21 @@ public class HoodieWriteConfig extends HoodieConfig {
   // It is here so that both the client and deltastreamer use the same reference
   public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
 
+  public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
+      .key(HoodieTableConfig.DATABASE_NAME.key())
+      .noDefaultValue()
+      .withDocumentation("Database name that will be used for identify table related to different databases.");
+
   public static final ConfigProperty<String> TBL_NAME = ConfigProperty
       .key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)
       .noDefaultValue()
       .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");
 
+  public static final ConfigProperty<String> HOODIE_JOB_ID = ConfigProperty
+      .key("hoodie.job.id")
+      .noDefaultValue()
+      .withDocumentation("JobId use to identify a hoodie job. (e.g A spark job writes data to hoodie table.)");
+
   public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
       .key("hoodie.datasource.write.precombine.field")
       .defaultValue("ts")
@@ -962,6 +975,14 @@ public class HoodieWriteConfig extends HoodieConfig {
         HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
   }
 
+  public String getDatabaseName() {
+    return getString(DATABASE_NAME);
+  }
+
+  public String getHoodieJobId() {
+    return getString(HOODIE_JOB_ID);
+  }
+
   public String getPreCombineField() {
     return getString(PRECOMBINE_FIELD_NAME);
   }
@@ -1820,6 +1841,42 @@ public class HoodieWriteConfig extends HoodieConfig {
         HoodieMetricsDatadogConfig.METRIC_TAG_VALUES, ",").split("\\s*,\\s*")).collect(Collectors.toList());
   }
 
+  public int getZhiyanApiTimeoutSeconds() {
+    return getInt(HoodieMetricsZhiyanConfig.API_TIMEOUT_IN_SECONDS);
+  }
+
+  public int getZhiyanReportPeriodSeconds() {
+    return getInt(HoodieMetricsZhiyanConfig.REPORT_PERIOD_SECONDS);
+  }
+
+  public String getZhiyanReportServiceURL() {
+    return getString(HoodieMetricsZhiyanConfig.REPORT_SERVICE_URL);
+  }
+
+  public String getZhiyanReportServicePath() {
+    return getString(HoodieMetricsZhiyanConfig.REPORT_SERVICE_PATH);
+  }
+
+  public String getZhiyanHoodieJobName() {
+    String zhiyanJobName = getString(HoodieMetricsZhiyanConfig.ZHIYAN_JOB_NAME);
+    if (getBoolean(HoodieMetricsZhiyanConfig.ZHIYAN_RANDOM_JOBNAME_SUFFIX)) {
+      if (!zhiyanJobName.isEmpty()) {
+        return zhiyanJobName + "." + UUID.randomUUID();
+      } else {
+        return engineType + "." + UUID.randomUUID();
+      }
+    }
+    return zhiyanJobName;
+  }
+
+  public String getZhiyanAppMask() {
+    return getString(HoodieMetricsZhiyanConfig.ZHIYAN_METRICS_HOODIE_APPMASK);
+  }
+
+  public String getZhiyanSeclvlEnvName() {
+    return getString(HoodieMetricsZhiyanConfig.ZHIYAN_METRICS_HOODIE_SECLVLENNAME);
+  }
+
   public int getCloudWatchReportPeriodSeconds() {
     return getInt(HoodieMetricsCloudWatchConfig.REPORT_PERIOD_SECONDS);
   }
@@ -1872,6 +1929,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
   }
 
+  public int getMetricEventQueueSize() {
+    return getIntOrDefault(HoodieMetricsConfig.METRICS_EVENT_QUEUE_SIZE);
+  }
+
   /**
    * memory configs.
    */
@@ -2135,6 +2196,21 @@ public class HoodieWriteConfig extends HoodieConfig {
     return metastoreConfig.enableMetastore();
   }
 
+  /**
+   * Tdbank configs
+   * */
+  public String getTdbankTdmAddr() {
+    return getString(TdbankConfig.TDBANK_TDM_ADDR);
+  }
+
+  public int getTdbankTdmPort() {
+    return getInt(TdbankConfig.TDBANK_TDM_PORT);
+  }
+
+  public String getTdbankBid() {
+    return getString(TdbankConfig.TDBANK_BID);
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2159,6 +2235,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     private boolean isMetricsJmxConfigSet = false;
     private boolean isMetricsGraphiteConfigSet = false;
     private boolean isLayoutConfigSet = false;
+    private boolean isTdbankConfigSet = false;
 
     public Builder withEngineType(EngineType engineType) {
       this.engineType = engineType;
@@ -2216,6 +2293,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withDatabaseName(String dbName) {
+      writeConfig.setValue(DATABASE_NAME, dbName);
+      return this;
+    }
+
     public Builder withPreCombineField(String preCombineField) {
       writeConfig.setValue(PRECOMBINE_FIELD_NAME, preCombineField);
       return this;
@@ -2583,6 +2665,8 @@ public class HoodieWriteConfig extends HoodieConfig {
           HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
       writeConfig.setDefaultOnCondition(!isLayoutConfigSet,
           HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+      writeConfig.setDefaultOnCondition(!isTdbankConfigSet,
+          TdbankConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
       writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
 
       // isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 957b439051..787819be12 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -47,14 +47,14 @@ public class HoodieMetricsConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty
       .key(METRIC_PREFIX + ".on")
-      .defaultValue(false)
+      .defaultValue(true)
       .sinceVersion("0.5.0")
       .withDocumentation("Turn on/off metrics reporting. off by default.");
 
   public static final ConfigProperty<MetricsReporterType> METRICS_REPORTER_TYPE_VALUE = ConfigProperty
       .key(METRIC_PREFIX + ".reporter.type")
-      .defaultValue(MetricsReporterType.GRAPHITE)
-      .sinceVersion("0.5.0")
+      .defaultValue(MetricsReporterType.ZHIYAN)
+      .sinceVersion("0.11.0")
       .withDocumentation("Type of metrics reporter.");
 
   // User defined
@@ -69,10 +69,15 @@ public class HoodieMetricsConfig extends HoodieConfig {
       .defaultValue("")
       .sinceVersion("0.11.0")
       .withInferFunction(cfg -> {
+        StringBuilder sb = new StringBuilder();
+        if (cfg.contains(HoodieTableConfig.DATABASE_NAME)) {
+          sb.append(cfg.getString(HoodieTableConfig.DATABASE_NAME));
+          sb.append(".");
+        }
         if (cfg.contains(HoodieTableConfig.NAME)) {
-          return Option.of(cfg.getString(HoodieTableConfig.NAME));
+          sb.append(cfg.getString(HoodieTableConfig.NAME));
         }
-        return Option.empty();
+        return sb.length() == 0 ? Option.empty() : Option.of(sb.toString());
       })
       .withDocumentation("The prefix given to the metrics names.");
 
@@ -94,6 +99,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
       })
       .withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");
 
+  public static final ConfigProperty<Integer> METRICS_EVENT_QUEUE_SIZE = ConfigProperty
+      .key(METRIC_PREFIX + ".event.queue.size")
+      .defaultValue(10_000_000)
+      .sinceVersion("0.11.0")
+      .withDocumentation("The prefix given to the metrics names.");
+
   /**
    * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
    */
@@ -197,6 +208,8 @@ public class HoodieMetricsConfig extends HoodieConfig {
           HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
       hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
             HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
+      hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.ZHIYAN,
+          HoodieMetricsZhiyanConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
       return hoodieMetricsConfig;
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java
new file mode 100644
index 0000000000..d090b19b2f
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config.metrics;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX;
+
+@ConfigClassProperty(name = "Metrics Configurations for Zhiyan",
+    groupName = ConfigGroups.Names.METRICS,
+    description = "Enables reporting on Hudi metrics using Zhiyan. "
+      + " Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HoodieMetricsZhiyanConfig extends HoodieConfig {
+
+  public static final String ZHIYAN_PREFIX = METRIC_PREFIX + ".zhiyan";
+
+  public static final ConfigProperty<Integer> API_TIMEOUT_IN_SECONDS = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".api.timeout.seconds")
+      .defaultValue(10)
+      .sinceVersion("0.10.0")
+      .withDocumentation("Zhiyan API timeout in seconds. Default to 10.");
+
+  public static final ConfigProperty<Integer> REPORT_PERIOD_SECONDS = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".report.period.seconds")
+      .defaultValue(10)
+      .sinceVersion("0.10.0")
+      .withDocumentation("Zhiyan Report period seconds. Default to 10.");
+
+  public static final ConfigProperty<String> REPORT_SERVICE_URL = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".report.service.url")
+      .defaultValue("http://zhiyan.monitor.access.inner.woa.com:8080")
+      .withDocumentation("Zhiyan Report service url.");
+
+  public static final ConfigProperty<String> REPORT_SERVICE_PATH = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".report.service.path")
+      .defaultValue("/access_v1.http_service/HttpCurveReportRpc")
+      .withDocumentation("Zhiyan Report service path.");
+
+  public static final ConfigProperty<String> ZHIYAN_JOB_NAME = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".job.name")
+      .defaultValue("")
+      .sinceVersion("0.10.0")
+      .withDocumentation("Name of Job using zhiyan metrics reporter.");
+
+  public static final ConfigProperty<Boolean> ZHIYAN_RANDOM_JOBNAME_SUFFIX = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".random.job.name.suffix")
+      .defaultValue(true)
+      .sinceVersion("0.10.0")
+      .withDocumentation("Whether the Zhiyan job name need a random suffix , default true.");
+
+  public static final ConfigProperty<String> ZHIYAN_METRICS_HOODIE_APPMASK = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".hoodie.appmask")
+      .defaultValue("1701_36311_HUDI")
+      .sinceVersion("0.10.0")
+      .withDocumentation("Zhiyan appmask for hudi.");
+
+  public static final ConfigProperty<String> ZHIYAN_METRICS_HOODIE_SECLVLENNAME = ConfigProperty
+      .key(ZHIYAN_PREFIX + ".hoodie.seclvlenname")
+      .defaultValue("hudi_metrics")
+      .sinceVersion("0.10.0")
+      .withDocumentation("Zhiyan seclvlenvname for hudi, default hudi_metrics");
+
+  public static Builder newBuilder() {
+    return new HoodieMetricsZhiyanConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final HoodieMetricsZhiyanConfig hoodieMetricsZhiyanConfig = new HoodieMetricsZhiyanConfig();
+
+    public HoodieMetricsZhiyanConfig.Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.hoodieMetricsZhiyanConfig.getProps().load(reader);
+        return this;
+      }
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder fromProperties(Properties props) {
+      this.hoodieMetricsZhiyanConfig.getProps().putAll(props);
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder withAppMask(String appMask) {
+      hoodieMetricsZhiyanConfig.setValue(ZHIYAN_METRICS_HOODIE_APPMASK, appMask);
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder withSeclvlEnvName(String seclvlEnvName) {
+      hoodieMetricsZhiyanConfig.setValue(ZHIYAN_METRICS_HOODIE_SECLVLENNAME, seclvlEnvName);
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder withReportServiceUrl(String url) {
+      hoodieMetricsZhiyanConfig.setValue(REPORT_SERVICE_URL, url);
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder withApiTimeout(int apiTimeout) {
+      hoodieMetricsZhiyanConfig.setValue(API_TIMEOUT_IN_SECONDS, String.valueOf(apiTimeout));
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder withJobName(String jobName) {
+      hoodieMetricsZhiyanConfig.setValue(ZHIYAN_JOB_NAME, jobName);
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig.Builder withReportPeriodSeconds(int seconds) {
+      hoodieMetricsZhiyanConfig.setValue(REPORT_PERIOD_SECONDS, String.valueOf(seconds));
+      return this;
+    }
+
+    public HoodieMetricsZhiyanConfig build() {
+      hoodieMetricsZhiyanConfig.setDefaults(HoodieMetricsZhiyanConfig.class.getName());
+      return hoodieMetricsZhiyanConfig;
+    }
+  }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 69ef7917b2..450f741586 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.hudi.async.AsyncPostEventService;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
@@ -26,9 +27,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Timer;
+import org.apache.hudi.tdbank.TdbankHoodieMetricsEvent;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.util.Locale;
+import java.util.concurrent.LinkedBlockingQueue;
+
 /**
  * Wrapper for metrics-related operations.
  */
@@ -49,6 +54,8 @@ public class HoodieMetrics {
   private String conflictResolutionFailureCounterName = null;
   private HoodieWriteConfig config;
   private String tableName;
+  // Add a job id to identify job for each hoodie table.
+  private String hoodieJobId;
   private Timer rollbackTimer = null;
   private Timer cleanTimer = null;
   private Timer commitTimer = null;
@@ -61,10 +68,31 @@ public class HoodieMetrics {
   private Counter conflictResolutionSuccessCounter = null;
   private Counter conflictResolutionFailureCounter = null;
 
+  public static final String TOTAL_PARTITIONS_WRITTEN = "totalPartitionsWritten";
+  public static final String TOTAL_FILES_INSERT = "totalFilesInsert";
+  public static final String TOTAL_FILES_UPDATE = "totalFilesUpdate";
+  public static final String TOTAL_RECORDS_WRITTEN = "totalRecordsWritten";
+  public static final String TOTAL_UPDATE_RECORDS_WRITTEN = "totalUpdateRecordsWritten";
+  public static final String TOTAL_INSERT_RECORDS_WRITTEN = "totalInsertRecordsWritten";
+  public static final String TOTAL_BYTES_WRITTEN = "totalBytesWritten";
+  public static final String TOTAL_SCAN_TIME = "totalScanTime";
+  public static final String TOTAL_CREATE_TIME = "totalCreateTime";
+  public static final String TOTAL_UPSERT_TIME = "totalUpsertTime";
+  public static final String TOTAL_COMPACTED_RECORDS_UPDATED = "totalCompactedRecordsUpdated";
+  public static final String TOTAL_LOGFILES_COMPACTED = "totalLogFilesCompacted";
+  public static final String TOTAL_LOGFILES_SIZE = "totalLogFilesSize";
+
+  // a queue for buffer metrics event.
+  private final LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue = new LinkedBlockingQueue<>();
+
   public HoodieMetrics(HoodieWriteConfig config) {
     this.config = config;
     this.tableName = config.getTableName();
+    this.hoodieJobId = config.getHoodieJobId();
     if (config.isMetricsOn()) {
+      // start post event service.
+      AsyncPostEventService postEventService = new AsyncPostEventService(config, queue);
+      postEventService.start(null);
       Metrics.init(config);
       this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
       this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
@@ -165,6 +193,25 @@ public class HoodieMetrics {
     Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
     Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
     Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
+
+    TdbankHoodieMetricsEvent metricEvent = TdbankHoodieMetricsEvent.newBuilder()
+        .withDBName(config.getDatabaseName())
+        .withTableName(config.getTableName())
+        .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT)))
+        .addMetrics("totalPartitionsWritten", 0)
+        .addMetrics("totalFilesUpdate", 0)
+        .addMetrics("totalRecordsWritten", 0)
+        .addMetrics("totalUpdateRecordsWritten", 0)
+        .addMetrics("totalInsertRecordsWritten", 0)
+        .addMetrics("totalBytesWritten", 0)
+        .addMetrics("totalScanTime", 0)
+        .addMetrics("totalCreateTime", 0)
+        .addMetrics("totalUpsertTime", 0)
+        .addMetrics("totalCompactedRecordsUpdated", 0)
+        .addMetrics("totalLogFilesCompacted", 0)
+        .addMetrics("totalLogFilesSize", 0)
+        .build();
+    postEvent(metricEvent);
   }
 
   public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
@@ -197,23 +244,55 @@ public class HoodieMetrics {
       Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
       Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
       Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
+
+      TdbankHoodieMetricsEvent metricEvent = TdbankHoodieMetricsEvent.newBuilder()
+          .withDBName(config.getDatabaseName())
+          .withTableName(config.getTableName())
+          .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT)))
+          .addMetrics("totalPartitionsWritten", totalPartitionsWritten)
+          .addMetrics("totalFilesUpdate", totalFilesUpdate)
+          .addMetrics("totalFilesInsert", totalFilesInsert)
+          .addMetrics("totalRecordsWritten", totalRecordsWritten)
+          .addMetrics("totalUpdateRecordsWritten", totalUpdateRecordsWritten)
+          .addMetrics("totalInsertRecordsWritten", totalInsertRecordsWritten)
+          .addMetrics("totalBytesWritten", totalBytesWritten)
+          .addMetrics("totalScanTime", totalTimeTakenByScanner)
+          .addMetrics("totalCreateTime", totalTimeTakenForInsert)
+          .addMetrics("totalUpsertTime", totalTimeTakenForUpsert)
+          .addMetrics("totalCompactedRecordsUpdated", totalCompactedRecordsUpdated)
+          .addMetrics("totalLogFilesCompacted", totalLogFilesCompacted)
+          .addMetrics("totalLogFilesSize", totalLogFilesSize)
+          .build();
+      postEvent(metricEvent);
     }
   }
 
   private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
       String actionType) {
     if (config.isMetricsOn()) {
+      TdbankHoodieMetricsEvent.Builder builder = TdbankHoodieMetricsEvent.newBuilder()
+          .withDBName(config.getDatabaseName())
+          .withTableName(config.getTableName())
+          .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT)));
       Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
       if (eventTimePairMinMax.getLeft().isPresent()) {
         long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
         Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
+        builder = builder.addMetrics("commitLatencyInMs", commitLatencyInMs);
       }
       if (eventTimePairMinMax.getRight().isPresent()) {
         long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
         Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
+        builder = builder.addMetrics("commitFreshnessInMs", commitFreshnessInMs);
       }
       Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
       Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
+
+      TdbankHoodieMetricsEvent event = builder
+          .addMetrics("commitTime", commitEpochTimeInMs)
+          .addMetrics("duration", durationInMs)
+          .build();
+      postEvent(event);
     }
   }
 
@@ -223,6 +302,14 @@ public class HoodieMetrics {
           String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
       Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
       Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
+      TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+          .withDBName(config.getDatabaseName())
+          .withTableName(config.getTableName())
+          .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("rollback".toUpperCase(Locale.ROOT)))
+          .addMetrics("duration", durationInMs)
+          .addMetrics("numFilesDeleted", numFilesDeleted)
+          .build();
+      postEvent(event);
     }
   }
 
@@ -232,6 +319,14 @@ public class HoodieMetrics {
           String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
       Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
       Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
+      TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+          .withDBName(config.getDatabaseName())
+          .withTableName(config.getTableName())
+          .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("clean".toUpperCase(Locale.ROOT)))
+          .addMetrics("duration", durationInMs)
+          .addMetrics("numFilesDeleted", numFilesDeleted)
+          .build();
+      postEvent(event);
     }
   }
 
@@ -241,6 +336,14 @@ public class HoodieMetrics {
           numFilesFinalized));
       Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
       Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
+      TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+          .withDBName(config.getDatabaseName())
+          .withTableName(config.getTableName())
+          .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("finalize".toUpperCase(Locale.ROOT)))
+          .addMetrics("duration", durationInMs)
+          .addMetrics("numFilesFinalized", numFilesFinalized)
+          .build();
+      postEvent(event);
     }
   }
 
@@ -248,11 +351,21 @@ public class HoodieMetrics {
     if (config.isMetricsOn()) {
       LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
       Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
+      TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+          .withDBName(config.getDatabaseName())
+          .withTableName(config.getTableName())
+          .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("index".toUpperCase(Locale.ROOT)))
+          .addMetrics(String.format("%s.duration", action), durationInMs)
+          .build();
+      postEvent(event);
     }
   }
 
   String getMetricsName(String action, String metric) {
-    return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
+    // if using zhiyan, then we don't report metrics prefix because we will use tags to identify each metrics
+    return config == null ? null :
+      config.getMetricsReporterType() == MetricsReporterType.ZHIYAN ? String.format("%s.%s", action, metric) :
+        String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
   }
 
   /**
@@ -284,4 +397,9 @@ public class HoodieMetrics {
     }
     return counter;
   }
+
+  private void postEvent(TdbankHoodieMetricsEvent event) {
+    LOG.info("Post metrics event to queue, queue size now is " + queue.size());
+    queue.add(event);
+  }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index 8f3e497481..10238a9c92 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -120,6 +120,7 @@ public class Metrics {
 
   public static void registerGauge(String metricName, final long value) {
     try {
+      LOG.info("Register Metric Name: " + metricName);
       MetricRegistry registry = Metrics.getInstance().getRegistry();
       HoodieGauge guage = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value));
       guage.setValue(value);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index d81e337b28..b67ab63f23 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -29,6 +29,7 @@ import org.apache.hudi.metrics.prometheus.PrometheusReporter;
 import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
 
 import com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.metrics.zhiyan.ZhiyanMetricsReporter;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -81,6 +82,9 @@ public class MetricsReporterFactory {
       case CLOUDWATCH:
         reporter = new CloudWatchMetricsReporter(config, registry);
         break;
+      case ZHIYAN:
+        reporter = new ZhiyanMetricsReporter(config, registry);
+        break;
       default:
         LOG.error("Reporter type[" + type + "] is not supported.");
         break;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
index 3c86001592..29a8097a50 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
@@ -22,5 +22,5 @@ package org.apache.hudi.metrics;
  * Types of the reporter supported, hudi also supports user defined reporter.
  */
 public enum MetricsReporterType {
-  GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH
+  GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, ZHIYAN
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java
new file mode 100644
index 0000000000..b358ce182b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.zhiyan;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Consts;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class ZhiyanHttpClient {
+
+  private static final Logger LOG = LogManager.getLogger(ZhiyanHttpClient.class);
+  private final CloseableHttpClient httpClient;
+  private final ObjectMapper mapper;
+  private final String serviceUrl;
+  private final String requestPath;
+
+  private static final String JSON_CONTENT_TYPE = "application/json";
+  private static final String CONTENT_TYPE = "Content-Type";
+
+  public ZhiyanHttpClient(String url, String path, int timeoutSeconds) {
+    httpClient = HttpClientBuilder.create()
+      .setDefaultRequestConfig(RequestConfig.custom()
+        .setConnectTimeout(timeoutSeconds * 1000)
+        .setConnectionRequestTimeout(timeoutSeconds * 1000)
+        .setSocketTimeout(timeoutSeconds * 1000).build())
+      .build();
+
+    serviceUrl = url;
+    requestPath = path;
+
+    mapper = new ObjectMapper();
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+    mapper.configure(JsonParser.Feature.IGNORE_UNDEFINED, true);
+    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+  }
+
+  public <T> String post(T input) throws Exception {
+    HttpPost postReq = new HttpPost(serviceUrl + requestPath);
+    postReq.setHeader(CONTENT_TYPE, JSON_CONTENT_TYPE);
+
+    try {
+      return requestWithEntity(postReq, input);
+    } catch (Exception e) {
+      LOG.warn(String.format("Failed to post to %s, cause by", serviceUrl + requestPath), e);
+      throw e;
+    } finally {
+      postReq.releaseConnection();
+    }
+  }
+
+  private <T> String requestWithEntity(HttpRequestBase request, T input) throws Exception {
+    if (input != null && request instanceof HttpEntityEnclosingRequestBase) {
+      HttpEntity entity = getEntity(input);
+      ((HttpEntityEnclosingRequestBase) request).setEntity(entity);
+    }
+
+    HttpContext httpContext = new BasicHttpContext();
+    try (CloseableHttpResponse response = httpClient.execute(request, httpContext)) {
+      int status = response.getStatusLine().getStatusCode();
+      if (status != HttpStatus.SC_OK && status != HttpStatus.SC_CREATED) {
+        throw new HttpException("Response code is " + status);
+      }
+      HttpEntity resultEntity = response.getEntity();
+      return EntityUtils.toString(resultEntity, Consts.UTF_8);
+    } catch (Exception ex) {
+      LOG.error("Error when request http.", ex);
+      throw ex;
+    }
+  }
+
+  private <T> HttpEntity getEntity(T input) throws JsonProcessingException {
+    HttpEntity entity;
+    if (input instanceof String) {
+      entity = new StringEntity((String) input, ContentType.APPLICATION_JSON);
+    } else if (input instanceof HttpEntity) {
+      return (HttpEntity) input;
+    } else {
+      try {
+        String json = mapper.writeValueAsString(input);
+        entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+      } catch (JsonProcessingException e) {
+        LOG.error(String.format("Error when process %s due to ", input), e);
+        throw e;
+      }
+    }
+    return entity;
+  }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java
new file mode 100644
index 0000000000..323fe17106
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.zhiyan;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+public class ZhiyanMetricsReporter extends MetricsReporter {
+
+  private final ZhiyanReporter reporter;
+  private final int reportPeriodSeconds;
+
+  public ZhiyanMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
+    this.reportPeriodSeconds = config.getZhiyanReportPeriodSeconds();
+    ZhiyanHttpClient client = new ZhiyanHttpClient(
+        config.getZhiyanReportServiceURL(),
+        config.getZhiyanReportServicePath(),
+        config.getZhiyanApiTimeoutSeconds());
+    this.reporter = new ZhiyanReporter(registry, MetricFilter.ALL, client,
+        config.getZhiyanHoodieJobName(),
+        config.getTableName(),
+        config.getZhiyanAppMask(),
+        config.getZhiyanSeclvlEnvName());
+  }
+
+  @Override
+  public void start() {
+    reporter.start(reportPeriodSeconds, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void report() {
+    reporter.report();
+  }
+
+  @Override
+  public Closeable getReporter() {
+    return reporter;
+  }
+
+  @Override
+  public void stop() {
+    reporter.stop();
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java
new file mode 100644
index 0000000000..4e5d416989
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.zhiyan;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+public class ZhiyanReporter extends ScheduledReporter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZhiyanReporter.class);
+  private final ZhiyanHttpClient client;
+  private final String jobName;
+  private final String hoodieTableName;
+  private final String appMask;
+  private final String seclvlEnName;
+
+  public ZhiyanReporter(MetricRegistry registry,
+                        MetricFilter filter,
+                        ZhiyanHttpClient client,
+                        String jobName,
+                        String hoodieTableName,
+                        String appMask,
+                        String seclvlEnName) {
+    super(registry, "hudi-zhiyan-reporter", filter, TimeUnit.SECONDS, TimeUnit.SECONDS);
+    this.client = client;
+    this.jobName = jobName;
+    this.hoodieTableName = hoodieTableName;
+    this.appMask = appMask;
+    this.seclvlEnName = seclvlEnName;
+  }
+
+  @Override
+  public void report(SortedMap<String, Gauge> gauges,
+                     SortedMap<String, Counter> counters,
+                     SortedMap<String, Histogram> histograms,
+                     SortedMap<String, Meter> meters,
+                     SortedMap<String, Timer> timers) {
+    final PayloadBuilder builder = new PayloadBuilder()
+        .withAppMask(appMask)
+        .withJobName(jobName)
+        .withSeclvlEnName(seclvlEnName)
+        .withTableName(hoodieTableName);
+
+    long timestamp = System.currentTimeMillis();
+
+    gauges.forEach((metricName, gauge) -> {
+      builder.addGauge(metricName, timestamp, gauge.getValue().toString());
+    });
+
+    String payload = builder.build();
+
+    LOG.info("Payload is:" + payload);
+    try {
+      client.post(payload);
+    } catch (Exception e) {
+      LOG.error("Payload is " + payload);
+      LOG.error("Error when report data to zhiyan", e);
+    }
+  }
+
+  static class PayloadBuilder {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private final ObjectNode payload;
+
+    private final ArrayNode reportData;
+
+    private String appMark;
+    // 指标组
+    private String seclvlEnName;
+
+    private String jobName;
+
+    private String tableName;
+
+    public PayloadBuilder() {
+      MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+      MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+      MAPPER.configure(JsonParser.Feature.IGNORE_UNDEFINED, true);
+      MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+      this.payload = MAPPER.createObjectNode();
+      this.reportData = MAPPER.createArrayNode();
+    }
+
+    PayloadBuilder withAppMask(String appMark) {
+      this.appMark = appMark;
+      this.payload.put("app_mark", appMark);
+      return this;
+    }
+
+    PayloadBuilder withJobName(String jobName) {
+      this.jobName = jobName;
+      return this;
+    }
+
+    PayloadBuilder withTableName(String tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    PayloadBuilder withSeclvlEnName(String seclvlEnName) {
+      this.seclvlEnName = seclvlEnName;
+      this.payload.put("sec_lvl_en_name", seclvlEnName);
+      return this;
+    }
+
+    PayloadBuilder addGauge(String metric, long timestamp, String gaugeValue) {
+      ObjectNode tmpData = MAPPER.createObjectNode();
+      tmpData.put("metric", metric);
+      tmpData.put("value", Long.parseLong(gaugeValue));
+      // tags means dimension in zhiyan.
+      ObjectNode tags = tmpData.objectNode();
+      tags.put("jobName", jobName);
+      tags.put("tableName", tableName);
+      tmpData.set("tags", tags);
+      this.reportData.add(tmpData);
+      return this;
+    }
+
+    PayloadBuilder addHistogram() {
+      return this;
+    }
+
+    PayloadBuilder addCounter() {
+      return this;
+    }
+
+    PayloadBuilder addMeters() {
+      return this;
+    }
+
+    String build() {
+      payload.put("report_data", reportData.toString());
+      return payload.toString();
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java
new file mode 100644
index 0000000000..85f7a9b0b9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.tdbank;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.tencent.tdbank.busapi.BusClientConfig;
+import com.tencent.tdbank.busapi.DefaultMessageSender;
+import com.tencent.tdbank.busapi.MessageSender;
+import com.tencent.tdbank.busapi.SendResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class TDBankClient implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(TDBankClient.class);
+  private static final Long TDBANK_SENDER_TIMEOUT_MS =
+      Long.parseLong(System.getProperty("tdbank.sender.timeout-ms", "20000"));
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final String HUDI_EVENT_TID = "hudi_metric";
+
+  private final String bid;
+  private MessageSender sender;
+  private String tdmAddr;
+  private int tdmPort;
+  private volatile boolean hasInit = false;
+
+  private static final int RETRY_TIMES = 3;
+
+  public TDBankClient(String tdmAddr, int tdmPort, String bid) {
+    this.bid = bid;
+    this.tdmAddr = tdmAddr;
+    this.tdmPort = tdmPort;
+  }
+
+  /**
+   * send message to tdbank and return send result
+   */
+  public SendResult sendMessage(Object message) throws Exception {
+    init();
+    LOG.info("Send message to tdbank, bid: {}, tid: {}", bid, HUDI_EVENT_TID);
+    int retryTimes = 0;
+    while (retryTimes < RETRY_TIMES) {
+      try {
+        return sender.sendMessage(MAPPER.writeValueAsBytes(message),
+          bid, HUDI_EVENT_TID, 0, UUID.randomUUID().toString(), TDBANK_SENDER_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+      } catch (Exception e) {
+        retryTimes++;
+        LOG.error("Error when send data to tdbank retry " + retryTimes, e);
+      }
+    }
+    return SendResult.UNKOWN_ERROR;
+  }
+
+  @Override
+  public void close() throws IOException {
+    sender.close();
+  }
+
+  private void init() throws Exception {
+    if (!hasInit) {
+      synchronized (this) {
+        if (!hasInit) {
+          try {
+            LOG.info("Init tdbank-client with tdmAddress: {}, tdmPort: {}, bid: {}", tdmAddr, tdmPort, bid);
+            String localhost = InetAddress.getLocalHost().getHostAddress();
+            BusClientConfig clientConfig =
+                new BusClientConfig(localhost, true, tdmAddr, tdmPort, bid, "all");
+            LOG.info("Before sender generated.");
+            sender = new DefaultMessageSender(clientConfig);
+            LOG.info("Successfully init sender.");
+          } catch (Exception e) {
+            LOG.warn("Failed to initialize tdbank client, using mock client instead. "
+                + "Warn: using mock client will ignore all the incoming events", e);
+            throw e;
+          }
+          hasInit = true;
+        }
+      }
+    }
+  }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java
new file mode 100644
index 0000000000..60a5e06a45
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.tdbank;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+import java.util.Properties;
+
+@Immutable
+@ConfigClassProperty(name = "Tdbank Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Tdbank configs")
+public class TdbankConfig extends HoodieConfig {
+  public static final ConfigProperty<String> TDBANK_TDM_ADDR = ConfigProperty
+      .key("hoodie.tdbank.tdm.addr")
+      .defaultValue("tl-tdbank-tdmanager.tencent-distribute.com")
+      .withDocumentation("tdbank manager address.");
+
+  public static final ConfigProperty<Integer> TDBANK_TDM_PORT = ConfigProperty
+      .key("hoodie.tdbank.tdm.port")
+      .defaultValue(8099)
+      .withDocumentation("tdbank manager port.");
+
+  public static final ConfigProperty<String> TDBANK_BID = ConfigProperty
+      .key("hoodie.tdbank.tdbank.bid")
+      .defaultValue("b_teg_iceberg_event_tdbank_mq")
+      .withDocumentation("tdbank bid, use iceberg's bid temporarily.");
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private final TdbankConfig hoodieTdbankConfig = new TdbankConfig();
+
+    public Builder withTDMAddr(String tdmAddr) {
+      hoodieTdbankConfig.setValue(TDBANK_TDM_ADDR, tdmAddr);
+      return this;
+    }
+
+    public Builder fromProperties(Properties props) {
+      hoodieTdbankConfig.setAll(props);
+      return this;
+    }
+
+    public Builder withTDMPort(int tdmPort) {
+      hoodieTdbankConfig.setValue(TDBANK_TDM_PORT, String.valueOf(tdmPort));
+      return this;
+    }
+
+    public Builder withBID(String bid) {
+      hoodieTdbankConfig.setValue(TDBANK_BID, bid);
+      return this;
+    }
+
+    public TdbankConfig build() {
+      hoodieTdbankConfig.setDefaults(TdbankConfig.class.getName());
+      return hoodieTdbankConfig;
+    }
+  }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java
new file mode 100644
index 0000000000..0be78386a1
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.tdbank;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TdbankHoodieMetricsEvent implements Serializable {
+  private String dbName;
+  private String tableName;
+  private EventType type;
+  private Map<String, Object> metrics;
+
+  private TdbankHoodieMetricsEvent() {
+    this.metrics = new TreeMap<>();
+  }
+
+  public enum EventType {
+    INDEX, CLEAN, FINALIZE, ROLLBACK, COMPACTION, COMMIT, DELTACOMMIT, REPLACECOMMIT
+  }
+
+  public static TdbankHoodieMetricsEvent.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final TdbankHoodieMetricsEvent hoodieMetricsEvent = new TdbankHoodieMetricsEvent();
+
+    public Builder() {
+    }
+
+    public Builder withDBName(String dbName) {
+      hoodieMetricsEvent.setDbName(dbName);
+      return this;
+    }
+
+    public Builder withTableName(String tableName) {
+      hoodieMetricsEvent.setTableName(tableName);
+      return this;
+    }
+
+    public Builder withTableType(EventType type) {
+      hoodieMetricsEvent.setType(type);
+      return this;
+    }
+
+    public Builder addMetrics(String key, Object value) {
+      hoodieMetricsEvent.addMetrics(key, value);
+      return this;
+    }
+
+    public TdbankHoodieMetricsEvent build() {
+      return hoodieMetricsEvent;
+    }
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public void setType(EventType type) {
+    this.type = type;
+  }
+
+  public void addMetrics(String key, Object value) {
+    this.metrics.put(key, value);
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public EventType getType() {
+    return type;
+  }
+
+  public Map<String, Object> getMetrics() {
+    return metrics;
+  }
+
+  public Object getMetric(String key) {
+    return metrics.get(key);
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public void setDbName(String dbName) {
+    this.dbName = dbName;
+  }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 53a5799508..34198d456c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -122,6 +122,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   @Override
   public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
     List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
+    if (commitActionType.equals(HoodieTimeline.COMMIT_ACTION)) {
+      writeTimer = metrics.getCommitCtx();
+    } else {
+      writeTimer = metrics.getDeltaCommitCtx();
+    }
     return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
   }
 
@@ -436,6 +441,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
     // only used for metadata table, the compaction happens in single thread
     HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
     commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
+    compactionTimer = metrics.getCompactionCtx();
     return compactionMetadata;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 89d01b53a6..85f970e7ec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -90,8 +90,10 @@ public class HoodieTableConfig extends HoodieConfig {
   public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
       .key("hoodie.database.name")
       .noDefaultValue()
-      .withDocumentation("Database name that will be used for incremental query.If different databases have the same table name during incremental query, "
-          + "we can set it to limit the table name under a specific database");
+      .withDocumentation("Database name to identify a table, currently will be used for "
+          + "1. incremental query.If different databases have the same table name during incremental query "
+          + "we can set it to limit the table name under a specific database"
+          + "2. identify a table");
 
   public static final ConfigProperty<String> NAME = ConfigProperty
       .key(HOODIE_TABLE_NAME_KEY)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a9e10d3e55..4a298839fb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -82,6 +82,11 @@ public class FlinkOptions extends HoodieConfig {
   // ------------------------------------------------------------------------
   //  Common Options
   // ------------------------------------------------------------------------
+  public static final ConfigOption<String> DATABASE_NAME = ConfigOptions
+      .key(HoodieWriteConfig.DATABASE_NAME.key())
+      .stringType()
+      .noDefaultValue()
+      .withDescription("Database name to identify tables");
 
   public static final ConfigOption<String> TABLE_NAME = ConfigOptions
       .key(HoodieWriteConfig.TBL_NAME.key())
@@ -411,7 +416,7 @@ public class FlinkOptions extends HoodieConfig {
       .key("write.bucket_assign.tasks")
       .intType()
       .noDefaultValue()
-      .withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism");
+      .withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment");
 
   public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
       .key("write.tasks")
@@ -522,8 +527,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
       .key("compaction.tasks")
       .intType()
-      .noDefaultValue()
-      .withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism");
+      .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
+      .withDescription("Parallelism of tasks that do actual compaction, default is 4");
 
   public static final String NUM_COMMITS = "num_commits";
   public static final String TIME_ELAPSED = "time_elapsed";
@@ -580,7 +585,7 @@ public class FlinkOptions extends HoodieConfig {
       .stringType()
       .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
       .withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
-          + "Default is KEEP_LATEST_COMMITS.");
+          +  "Default is KEEP_LATEST_COMMITS.");
 
   public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
       .key("clean.retain_commits")
@@ -589,14 +594,6 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
           + "This also directly translates into how much you can incrementally pull on this table, default 30");
 
-  public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions
-      .key("clean.retain_hours")
-      .intType()
-      .defaultValue(24)// default 24 hours
-      .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as"
-          + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
-          + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
-
   public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions
       .key("clean.retain_file_versions")
       .intType()
@@ -660,7 +657,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions
       .key("clustering.plan.partition.filter.mode")
       .stringType()
-      .defaultValue(ClusteringPlanPartitionFilterMode.NONE.name())
+      .defaultValue("NONE")
       .withDescription("Partition filter mode used in the creation of clustering plan. Available values are - "
           + "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
           + "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
@@ -668,16 +665,16 @@ public class FlinkOptions extends HoodieConfig {
           + "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
           + PARTITION_FILTER_END_PARTITION.key() + "'].");
 
-  public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+  public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
       .key("clustering.plan.strategy.target.file.max.bytes")
-      .longType()
-      .defaultValue(1024 * 1024 * 1024L) // default 1 GB
+      .intType()
+      .defaultValue(1024 * 1024 * 1024) // default 1 GB
       .withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");
 
-  public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+  public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
       .key("clustering.plan.strategy.small.file.limit")
-      .longType()
-      .defaultValue(600L) // default 600 MB
+      .intType()
+      .defaultValue(600) // default 600 MB
       .withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");
 
   public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
@@ -701,7 +698,6 @@ public class FlinkOptions extends HoodieConfig {
   // ------------------------------------------------------------------------
   //  Hive Sync Options
   // ------------------------------------------------------------------------
-
   public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
       .key("hive_sync.enable")
       .booleanType()
@@ -729,8 +725,8 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
       .key("hive_sync.mode")
       .stringType()
-      .defaultValue(HiveSyncMode.HMS.name())
-      .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'hms'");
+      .defaultValue("jdbc")
+      .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");
 
   public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
       .key("hive_sync.username")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index f022b04ea1..b2f72aed7d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -74,6 +74,9 @@ public class FlinkStreamerConfig extends Configuration {
       required = true)
   public String targetBasePath;
 
+  @Parameter(names = {"--target-db"}, description = "Name of target database")
+  public String targetDatabaseName;
+
   @Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
   public String targetTableName;
 
@@ -351,6 +354,7 @@ public class FlinkStreamerConfig extends Configuration {
 
     conf.setString(FlinkOptions.PATH, config.targetBasePath);
     conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
+    conf.setString(FlinkOptions.DATABASE_NAME, config.targetDatabaseName);
     // copy_on_write works same as COPY_ON_WRITE
     conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
     conf.setBoolean(FlinkOptions.INSERT_CLUSTER, config.insertCluster);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b153b2273c..b08eb570ce 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -107,6 +107,8 @@ public class HoodieFlinkStreamer {
       Pipelines.clean(conf, pipeline);
     }
 
-    env.execute(cfg.targetTableName);
+    String jobName = cfg.targetDatabaseName.isEmpty() ? cfg.targetTableName :
+        cfg.targetDatabaseName + "." + cfg.targetTableName;
+    env.execute(jobName);
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 1cf66ea343..1718175240 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -169,6 +169,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
       ObjectIdentifier tablePath,
       CatalogTable table,
       ResolvedSchema schema) {
+    // database name
+    conf.setString(FlinkOptions.DATABASE_NAME.key(), tablePath.getDatabaseName());
     // table name
     conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
     // hoodie key about options
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index ee807f49da..0df3a0c8da 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -164,8 +164,13 @@ public class DataSourceUtils {
     });
   }
 
+  public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName,
+                                                     Map<String, String> parameters) {
+    return createHoodieConfig(schemaStr, basePath, "default_db", tblName, parameters);
+  }
+
   public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath,
-      String tblName, Map<String, String> parameters) {
+      String dbName, String tblName, Map<String, String> parameters) {
     boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
     boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
         .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
@@ -178,6 +183,7 @@ public class DataSourceUtils {
     }
 
     return builder.forTable(tblName)
+        .withDatabaseName(dbName)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withInlineCompaction(inlineCompact).build())
         .withPayloadConfig(HoodiePayloadConfig.newBuilder()
@@ -189,8 +195,8 @@ public class DataSourceUtils {
   }
 
   public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
-                                                       String tblName, Map<String, String> parameters) {
-    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters));
+                                                       String dbName, String tblName, Map<String, String> parameters) {
+    return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, dbName, tblName, parameters));
   }
 
   public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 0d3edd592d..3b1caddb59 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -47,7 +47,7 @@ object HoodieCLIUtils {
 
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
-      metaClient.getTableConfig.getTableName, finalParameters.asJava)
+      metaClient.getTableConfig.getDatabaseName, metaClient.getTableConfig.getTableName, finalParameters.asJava)
   }
 
   def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b9ff4c0d1a..61cb7ef961 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -176,6 +176,7 @@ object HoodieSparkSqlWriter {
       // scalastyle:off
       if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
         operation == WriteOperationType.BULK_INSERT) {
+        parameters.put(HoodieWriteConfig.DATABASE_NAME.key(), databaseName)
         val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
           basePath, path, instantTime, partitionColumns)
         return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
@@ -197,7 +198,7 @@ object HoodieSparkSqlWriter {
             // Create a HoodieWriteClient & issue the delete.
             val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              null, path, tblName,
+              null, path, databaseName, tblName,
               mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
               .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
@@ -228,7 +229,7 @@ object HoodieSparkSqlWriter {
             }
             // Create a HoodieWriteClient & issue the delete.
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-              null, path, tblName,
+              null, path, databaseName, tblName,
               mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
               .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             // Issue delete partitions
@@ -310,7 +311,7 @@ object HoodieSparkSqlWriter {
             // Create a HoodieWriteClient & issue the write.
 
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,
-              tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
+              databaseName, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
@@ -441,6 +442,7 @@ object HoodieSparkSqlWriter {
 
     val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
     val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
+    val databaseName = hoodieConfig.getStringOrDefault(HoodieWriteConfig.DATABASE_NAME, "default")
     val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
     val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
       s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
@@ -486,6 +488,7 @@ object HoodieSparkSqlWriter {
         HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(HoodieTableType.valueOf(tableType))
           .setTableName(tableName)
+          .setDatabaseName(databaseName)
           .setRecordKeyFields(recordKeyFields)
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
@@ -506,7 +509,7 @@ object HoodieSparkSqlWriter {
 
       val jsc = new JavaSparkContext(sqlContext.sparkContext)
       val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-        schema, path, tableName, mapAsJavaMap(parameters)))
+        schema, path, databaseName, tableName, mapAsJavaMap(parameters)))
       try {
         writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
       } finally {
@@ -555,6 +558,7 @@ object HoodieSparkSqlWriter {
     }
     val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
     params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
+    val dbName = parameters.getOrElse(HoodieWriteConfig.DATABASE_NAME.key(), "default")
     val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
     val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
       val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index 1d65670f6d..69e120c2e3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -106,6 +106,7 @@ object AlterHoodieTableAddColumnsCommand {
       jsc,
       schema.toString,
       hoodieCatalogTable.tableLocation,
+      hoodieCatalogTable.table.identifier.database.getOrElse("default"),
       hoodieCatalogTable.tableName,
       HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava
     )
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f0394ad379..b098ff3ea4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -21,9 +21,9 @@ import org.apache.avro.Schema
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.config.HoodieWriteConfig.{DATABASE_NAME, TBL_NAME}
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
 import org.apache.spark.sql._
@@ -530,6 +530,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
         PRECOMBINE_FIELD.key -> preCombineField,
         TBL_NAME.key -> hoodieCatalogTable.tableName,
+        DATABASE_NAME.key -> targetTableDb,
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 11f0fc9785..4d0d5aeef2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -237,7 +237,7 @@ public class TestDataSourceUtils {
               DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
       params.put(pair.left, pair.right.toString());
       HoodieWriteConfig hoodieConfig = DataSourceUtils
-              .createHoodieConfig(avroSchemaString, config.getBasePath(), "test", params);
+              .createHoodieConfig(avroSchemaString, config.getBasePath(), "testdb", "test", params);
       assertEquals(pair.right, hoodieConfig.isAsyncClusteringEnabled());
 
       TypedProperties prop = new TypedProperties();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 4e4fe43ff9..114c3d0f37 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -62,6 +62,7 @@ class TestHoodieSparkSqlWriter {
   var tempPath: java.nio.file.Path = _
   var tempBootStrapPath: java.nio.file.Path = _
   var hoodieFooTableName = "hoodie_foo_tbl"
+  val hoodieDefaultDBName = "default_db"
   var tempBasePath: String = _
   var commonTableModifier: Map[String, String] = Map()
   case class StringLongTest(uuid: String, ts: Long)
@@ -490,6 +491,7 @@ class TestHoodieSparkSqlWriter {
   @MethodSource(Array("testDatasourceInsert"))
   def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, populateMetaFields: Boolean, baseFileFormat: String): Unit = {
     val hoodieFooTableName = "hoodie_foo_tbl"
+    val hoodieDefaultDBName = "default_db"
     val fooTableModifier = Map("path" -> tempBasePath,
       HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
       HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat,
@@ -510,7 +512,7 @@ class TestHoodieSparkSqlWriter {
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
     initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
     val client = spy(DataSourceUtils.createHoodieClient(
-      new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
+      new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieDefaultDBName, hoodieFooTableName,
       mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
 
     HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df, Option.empty, Option(client))
@@ -571,6 +573,7 @@ class TestHoodieSparkSqlWriter {
         new JavaSparkContext(sc),
         null,
         tempBasePath,
+        hoodieDefaultDBName,
         hoodieFooTableName,
         mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
 
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
index 3b3b8eafb8..5746cacb0b 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
@@ -65,6 +65,7 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
     String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
     String path = options.get("path").get();
     String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get();
+    String dbName = options.get(HoodieWriteConfig.DATABASE_NAME.key()).get();
     boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
         HoodieTableConfig.POPULATE_META_FIELDS.defaultValue());
     Map<String, String> properties = options.asMap();
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
index ab2f16703b..90ae1e7377 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
@@ -52,6 +52,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
     String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
     String path = properties.get("path");
     String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
+    String dbName = properties.get(HoodieWriteConfig.DATABASE_NAME.key());
     boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
         Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
     boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
@@ -61,7 +62,8 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
     // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
     tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
     // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
-    HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps);
+    HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path,
+        dbName, tblName, newProps);
     return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
         getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted);
   }
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
index 9a5366b12f..529b5bb49e 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
@@ -217,7 +217,7 @@ object Spark31AlterTableCommand extends Logging {
 
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
-      path, table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava)
+      path, table.identifier.database.getOrElse("default_db"), table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava)
 
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()