You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:12 UTC
[hudi] 02/45: [MINOR] Add Zhiyan metrics reporter
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 23412b2bee1e05231f100f922fe3785c39f2891b
Author: simonssu <si...@tencent.com>
AuthorDate: Wed May 25 21:08:45 2022 +0800
[MINOR] Add Zhiyan metrics reporter
---
dev/tencent-release.sh | 4 +-
hudi-client/hudi-client-common/pom.xml | 7 +
.../apache/hudi/async/AsyncPostEventService.java | 93 +++++++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 84 ++++++++++
.../hudi/config/metrics/HoodieMetricsConfig.java | 23 ++-
.../config/metrics/HoodieMetricsZhiyanConfig.java | 143 +++++++++++++++++
.../org/apache/hudi/metrics/HoodieMetrics.java | 120 ++++++++++++++-
.../main/java/org/apache/hudi/metrics/Metrics.java | 1 +
.../hudi/metrics/MetricsReporterFactory.java | 4 +
.../apache/hudi/metrics/MetricsReporterType.java | 2 +-
.../hudi/metrics/zhiyan/ZhiyanHttpClient.java | 129 ++++++++++++++++
.../hudi/metrics/zhiyan/ZhiyanMetricsReporter.java | 66 ++++++++
.../apache/hudi/metrics/zhiyan/ZhiyanReporter.java | 170 +++++++++++++++++++++
.../java/org/apache/hudi/tdbank/TDBankClient.java | 103 +++++++++++++
.../java/org/apache/hudi/tdbank/TdbankConfig.java | 82 ++++++++++
.../hudi/tdbank/TdbankHoodieMetricsEvent.java | 110 +++++++++++++
.../apache/hudi/client/HoodieFlinkWriteClient.java | 6 +
.../hudi/common/table/HoodieTableConfig.java | 6 +-
.../apache/hudi/configuration/FlinkOptions.java | 40 +++--
.../apache/hudi/streamer/FlinkStreamerConfig.java | 4 +
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 4 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 2 +
.../main/java/org/apache/hudi/DataSourceUtils.java | 12 +-
.../scala/org/apache/hudi/HoodieCLIUtils.scala | 2 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 12 +-
.../AlterHoodieTableAddColumnsCommand.scala | 1 +
.../hudi/command/MergeIntoHoodieTableCommand.scala | 3 +-
.../java/org/apache/hudi/TestDataSourceUtils.java | 2 +-
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 5 +-
.../org/apache/hudi/internal/DefaultSource.java | 1 +
.../apache/hudi/spark3/internal/DefaultSource.java | 4 +-
.../hudi/command/Spark31AlterTableCommand.scala | 2 +-
32 files changed, 1200 insertions(+), 47 deletions(-)
diff --git a/dev/tencent-release.sh b/dev/tencent-release.sh
index 944f497070..b788d62dc7 100644
--- a/dev/tencent-release.sh
+++ b/dev/tencent-release.sh
@@ -116,9 +116,9 @@ function deploy_spark(){
FLINK_VERSION=$3
if [ ${release_repo} = "Y" ]; then
- COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30"
+ COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30 -T 2.5C"
else
- COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30"
+ COMMON_OPTIONS="-Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -Dflink${FLINK_VERSION} -DskipTests -s dev/settings.xml -DretryFailedDeploymentCount=30 -T 2.5C"
fi
# INSTALL_OPTIONS="-U -Drat.skip=true -Djacoco.skip=true -Dscala-${SCALA_VERSION} -Dspark${SPARK_VERSION} -DskipTests -s dev/settings.xml -T 2.5C"
diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml
index 735b62957d..81bf645427 100644
--- a/hudi-client/hudi-client-common/pom.xml
+++ b/hudi-client/hudi-client-common/pom.xml
@@ -72,6 +72,13 @@
<version>0.2.2</version>
</dependency>
+ <!-- Tdbank -->
+ <dependency>
+ <groupId>com.tencent.tdbank</groupId>
+ <artifactId>TDBusSDK</artifactId>
+ <version>1.2.9</version>
+ </dependency>
+
<!-- Dropwizard Metrics -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java
new file mode 100644
index 0000000000..84cf82c913
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/async/AsyncPostEventService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.tdbank.TDBankClient;
+import org.apache.hudi.tdbank.TdbankHoodieMetricsEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Async service to post event to remote service..
+ */
+public class AsyncPostEventService extends HoodieAsyncService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncPostEventService.class);
+
+ private final transient ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue;
+ private final TDBankClient client;
+
+ public AsyncPostEventService(HoodieWriteConfig config, LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue) {
+ this.client = new TDBankClient(config.getTdbankTdmAddr(),
+ config.getTdbankTdmPort(), config.getTdbankBid());
+ this.queue = queue;
+ }
+
+ @Override
+ protected Pair<CompletableFuture, ExecutorService> startService() {
+ LOG.info("Start async post event service...");
+ return Pair.of(CompletableFuture.supplyAsync(() -> {
+ sendEvent();
+ return true;
+ }, executor), executor);
+ }
+
+ private void sendEvent() {
+ try {
+ while (!isShutdownRequested()) {
+ TdbankHoodieMetricsEvent event = queue.poll(10, TimeUnit.SECONDS);
+ if (event != null) {
+ client.sendMessage(event);
+ }
+ }
+ LOG.info("Post event service shutdown properly.");
+ } catch (Exception e) {
+ LOG.error("Error when send event to tdbank", e);
+ }
+ }
+
+ // TODO simplfy codes here among async package.
+ public static void waitForCompletion(AsyncArchiveService asyncArchiveService) {
+ if (asyncArchiveService != null) {
+ LOG.info("Waiting for async archive service to finish");
+ try {
+ asyncArchiveService.waitForShutdown();
+ } catch (Exception e) {
+ throw new HoodieException("Error waiting for async archive service to finish", e);
+ }
+ }
+ }
+
+ public static void forceShutdown(AsyncArchiveService asyncArchiveService) {
+ if (asyncArchiveService != null) {
+ LOG.info("Shutting down async archive service...");
+ asyncArchiveService.shutdown(true);
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 55979b481b..23bc0ee329 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -54,6 +54,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsZhiyanConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
@@ -72,6 +73,7 @@ import org.apache.hudi.table.storage.HoodieStorageLayout;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.hudi.tdbank.TdbankConfig;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -86,6 +88,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
+import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -108,11 +111,21 @@ public class HoodieWriteConfig extends HoodieConfig {
// It is here so that both the client and deltastreamer use the same reference
public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
+ public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
+ .key(HoodieTableConfig.DATABASE_NAME.key())
+ .noDefaultValue()
+ .withDocumentation("Database name that will be used for identify table related to different databases.");
+
public static final ConfigProperty<String> TBL_NAME = ConfigProperty
.key(HoodieTableConfig.HOODIE_TABLE_NAME_KEY)
.noDefaultValue()
.withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");
+ public static final ConfigProperty<String> HOODIE_JOB_ID = ConfigProperty
+ .key("hoodie.job.id")
+ .noDefaultValue()
+ .withDocumentation("JobId use to identify a hoodie job. (e.g A spark job writes data to hoodie table.)");
+
public static final ConfigProperty<String> PRECOMBINE_FIELD_NAME = ConfigProperty
.key("hoodie.datasource.write.precombine.field")
.defaultValue("ts")
@@ -962,6 +975,14 @@ public class HoodieWriteConfig extends HoodieConfig {
HoodieTableConfig.TYPE, HoodieTableConfig.TYPE.defaultValue().name()).toUpperCase());
}
+ public String getDatabaseName() {
+ return getString(DATABASE_NAME);
+ }
+
+ public String getHoodieJobId() {
+ return getString(HOODIE_JOB_ID);
+ }
+
public String getPreCombineField() {
return getString(PRECOMBINE_FIELD_NAME);
}
@@ -1820,6 +1841,42 @@ public class HoodieWriteConfig extends HoodieConfig {
HoodieMetricsDatadogConfig.METRIC_TAG_VALUES, ",").split("\\s*,\\s*")).collect(Collectors.toList());
}
+ public int getZhiyanApiTimeoutSeconds() {
+ return getInt(HoodieMetricsZhiyanConfig.API_TIMEOUT_IN_SECONDS);
+ }
+
+ public int getZhiyanReportPeriodSeconds() {
+ return getInt(HoodieMetricsZhiyanConfig.REPORT_PERIOD_SECONDS);
+ }
+
+ public String getZhiyanReportServiceURL() {
+ return getString(HoodieMetricsZhiyanConfig.REPORT_SERVICE_URL);
+ }
+
+ public String getZhiyanReportServicePath() {
+ return getString(HoodieMetricsZhiyanConfig.REPORT_SERVICE_PATH);
+ }
+
+ public String getZhiyanHoodieJobName() {
+ String zhiyanJobName = getString(HoodieMetricsZhiyanConfig.ZHIYAN_JOB_NAME);
+ if (getBoolean(HoodieMetricsZhiyanConfig.ZHIYAN_RANDOM_JOBNAME_SUFFIX)) {
+ if (!zhiyanJobName.isEmpty()) {
+ return zhiyanJobName + "." + UUID.randomUUID();
+ } else {
+ return engineType + "." + UUID.randomUUID();
+ }
+ }
+ return zhiyanJobName;
+ }
+
+ public String getZhiyanAppMask() {
+ return getString(HoodieMetricsZhiyanConfig.ZHIYAN_METRICS_HOODIE_APPMASK);
+ }
+
+ public String getZhiyanSeclvlEnvName() {
+ return getString(HoodieMetricsZhiyanConfig.ZHIYAN_METRICS_HOODIE_SECLVLENNAME);
+ }
+
public int getCloudWatchReportPeriodSeconds() {
return getInt(HoodieMetricsCloudWatchConfig.REPORT_PERIOD_SECONDS);
}
@@ -1872,6 +1929,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getStringOrDefault(HoodieMetricsConfig.METRICS_REPORTER_PREFIX);
}
+ public int getMetricEventQueueSize() {
+ return getIntOrDefault(HoodieMetricsConfig.METRICS_EVENT_QUEUE_SIZE);
+ }
+
/**
* memory configs.
*/
@@ -2135,6 +2196,21 @@ public class HoodieWriteConfig extends HoodieConfig {
return metastoreConfig.enableMetastore();
}
+ /**
+ * Tdbank configs
+ * */
+ public String getTdbankTdmAddr() {
+ return getString(TdbankConfig.TDBANK_TDM_ADDR);
+ }
+
+ public int getTdbankTdmPort() {
+ return getInt(TdbankConfig.TDBANK_TDM_PORT);
+ }
+
+ public String getTdbankBid() {
+ return getString(TdbankConfig.TDBANK_BID);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2159,6 +2235,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private boolean isMetricsJmxConfigSet = false;
private boolean isMetricsGraphiteConfigSet = false;
private boolean isLayoutConfigSet = false;
+ private boolean isTdbankConfigSet = false;
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
@@ -2216,6 +2293,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withDatabaseName(String dbName) {
+ writeConfig.setValue(DATABASE_NAME, dbName);
+ return this;
+ }
+
public Builder withPreCombineField(String preCombineField) {
writeConfig.setValue(PRECOMBINE_FIELD_NAME, preCombineField);
return this;
@@ -2583,6 +2665,8 @@ public class HoodieWriteConfig extends HoodieConfig {
HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isLayoutConfigSet,
HoodieLayoutConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+ writeConfig.setDefaultOnCondition(!isTdbankConfigSet,
+ TdbankConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION_NUM, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
// isLockProviderPropertySet must be fetched before setting defaults of HoodieLockConfig
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 957b439051..787819be12 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -47,14 +47,14 @@ public class HoodieMetricsConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> TURN_METRICS_ON = ConfigProperty
.key(METRIC_PREFIX + ".on")
- .defaultValue(false)
+ .defaultValue(true)
.sinceVersion("0.5.0")
.withDocumentation("Turn on/off metrics reporting. off by default.");
public static final ConfigProperty<MetricsReporterType> METRICS_REPORTER_TYPE_VALUE = ConfigProperty
.key(METRIC_PREFIX + ".reporter.type")
- .defaultValue(MetricsReporterType.GRAPHITE)
- .sinceVersion("0.5.0")
+ .defaultValue(MetricsReporterType.ZHIYAN)
+ .sinceVersion("0.11.0")
.withDocumentation("Type of metrics reporter.");
// User defined
@@ -69,10 +69,15 @@ public class HoodieMetricsConfig extends HoodieConfig {
.defaultValue("")
.sinceVersion("0.11.0")
.withInferFunction(cfg -> {
+ StringBuilder sb = new StringBuilder();
+ if (cfg.contains(HoodieTableConfig.DATABASE_NAME)) {
+ sb.append(cfg.getString(HoodieTableConfig.DATABASE_NAME));
+ sb.append(".");
+ }
if (cfg.contains(HoodieTableConfig.NAME)) {
- return Option.of(cfg.getString(HoodieTableConfig.NAME));
+ sb.append(cfg.getString(HoodieTableConfig.NAME));
}
- return Option.empty();
+ return sb.length() == 0 ? Option.empty() : Option.of(sb.toString());
})
.withDocumentation("The prefix given to the metrics names.");
@@ -94,6 +99,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
})
.withDocumentation("Enable metrics for locking infra. Useful when operating in multiwriter mode");
+ public static final ConfigProperty<Integer> METRICS_EVENT_QUEUE_SIZE = ConfigProperty
+ .key(METRIC_PREFIX + ".event.queue.size")
+ .defaultValue(10_000_000)
+ .sinceVersion("0.11.0")
+ .withDocumentation("The prefix given to the metrics names.");
+
/**
* @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
*/
@@ -197,6 +208,8 @@ public class HoodieMetricsConfig extends HoodieConfig {
HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
+ hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.ZHIYAN,
+ HoodieMetricsZhiyanConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
return hoodieMetricsConfig;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java
new file mode 100644
index 0000000000..d090b19b2f
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsZhiyanConfig.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config.metrics;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX;
+
+@ConfigClassProperty(name = "Metrics Configurations for Zhiyan",
+ groupName = ConfigGroups.Names.METRICS,
+ description = "Enables reporting on Hudi metrics using Zhiyan. "
+ + " Hudi publishes metrics on every commit, clean, rollback etc.")
+public class HoodieMetricsZhiyanConfig extends HoodieConfig {
+
+ public static final String ZHIYAN_PREFIX = METRIC_PREFIX + ".zhiyan";
+
+ public static final ConfigProperty<Integer> API_TIMEOUT_IN_SECONDS = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".api.timeout.seconds")
+ .defaultValue(10)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Zhiyan API timeout in seconds. Default to 10.");
+
+ public static final ConfigProperty<Integer> REPORT_PERIOD_SECONDS = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".report.period.seconds")
+ .defaultValue(10)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Zhiyan Report period seconds. Default to 10.");
+
+ public static final ConfigProperty<String> REPORT_SERVICE_URL = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".report.service.url")
+ .defaultValue("http://zhiyan.monitor.access.inner.woa.com:8080")
+ .withDocumentation("Zhiyan Report service url.");
+
+ public static final ConfigProperty<String> REPORT_SERVICE_PATH = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".report.service.path")
+ .defaultValue("/access_v1.http_service/HttpCurveReportRpc")
+ .withDocumentation("Zhiyan Report service path.");
+
+ public static final ConfigProperty<String> ZHIYAN_JOB_NAME = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".job.name")
+ .defaultValue("")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Name of Job using zhiyan metrics reporter.");
+
+ public static final ConfigProperty<Boolean> ZHIYAN_RANDOM_JOBNAME_SUFFIX = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".random.job.name.suffix")
+ .defaultValue(true)
+ .sinceVersion("0.10.0")
+ .withDocumentation("Whether the Zhiyan job name need a random suffix , default true.");
+
+ public static final ConfigProperty<String> ZHIYAN_METRICS_HOODIE_APPMASK = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".hoodie.appmask")
+ .defaultValue("1701_36311_HUDI")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Zhiyan appmask for hudi.");
+
+ public static final ConfigProperty<String> ZHIYAN_METRICS_HOODIE_SECLVLENNAME = ConfigProperty
+ .key(ZHIYAN_PREFIX + ".hoodie.seclvlenname")
+ .defaultValue("hudi_metrics")
+ .sinceVersion("0.10.0")
+ .withDocumentation("Zhiyan seclvlenvname for hudi, default hudi_metrics");
+
+ public static Builder newBuilder() {
+ return new HoodieMetricsZhiyanConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final HoodieMetricsZhiyanConfig hoodieMetricsZhiyanConfig = new HoodieMetricsZhiyanConfig();
+
+ public HoodieMetricsZhiyanConfig.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.hoodieMetricsZhiyanConfig.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder fromProperties(Properties props) {
+ this.hoodieMetricsZhiyanConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder withAppMask(String appMask) {
+ hoodieMetricsZhiyanConfig.setValue(ZHIYAN_METRICS_HOODIE_APPMASK, appMask);
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder withSeclvlEnvName(String seclvlEnvName) {
+ hoodieMetricsZhiyanConfig.setValue(ZHIYAN_METRICS_HOODIE_SECLVLENNAME, seclvlEnvName);
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder withReportServiceUrl(String url) {
+ hoodieMetricsZhiyanConfig.setValue(REPORT_SERVICE_URL, url);
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder withApiTimeout(int apiTimeout) {
+ hoodieMetricsZhiyanConfig.setValue(API_TIMEOUT_IN_SECONDS, String.valueOf(apiTimeout));
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder withJobName(String jobName) {
+ hoodieMetricsZhiyanConfig.setValue(ZHIYAN_JOB_NAME, jobName);
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig.Builder withReportPeriodSeconds(int seconds) {
+ hoodieMetricsZhiyanConfig.setValue(REPORT_PERIOD_SECONDS, String.valueOf(seconds));
+ return this;
+ }
+
+ public HoodieMetricsZhiyanConfig build() {
+ hoodieMetricsZhiyanConfig.setDefaults(HoodieMetricsZhiyanConfig.class.getName());
+ return hoodieMetricsZhiyanConfig;
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 69ef7917b2..450f741586 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metrics;
+import org.apache.hudi.async.AsyncPostEventService;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
@@ -26,9 +27,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
+import org.apache.hudi.tdbank.TdbankHoodieMetricsEvent;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import java.util.Locale;
+import java.util.concurrent.LinkedBlockingQueue;
+
/**
* Wrapper for metrics-related operations.
*/
@@ -49,6 +54,8 @@ public class HoodieMetrics {
private String conflictResolutionFailureCounterName = null;
private HoodieWriteConfig config;
private String tableName;
+ // Add a job id to identify job for each hoodie table.
+ private String hoodieJobId;
private Timer rollbackTimer = null;
private Timer cleanTimer = null;
private Timer commitTimer = null;
@@ -61,10 +68,31 @@ public class HoodieMetrics {
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
+ public static final String TOTAL_PARTITIONS_WRITTEN = "totalPartitionsWritten";
+ public static final String TOTAL_FILES_INSERT = "totalFilesInsert";
+ public static final String TOTAL_FILES_UPDATE = "totalFilesUpdate";
+ public static final String TOTAL_RECORDS_WRITTEN = "totalRecordsWritten";
+ public static final String TOTAL_UPDATE_RECORDS_WRITTEN = "totalUpdateRecordsWritten";
+ public static final String TOTAL_INSERT_RECORDS_WRITTEN = "totalInsertRecordsWritten";
+ public static final String TOTAL_BYTES_WRITTEN = "totalBytesWritten";
+ public static final String TOTAL_SCAN_TIME = "totalScanTime";
+ public static final String TOTAL_CREATE_TIME = "totalCreateTime";
+ public static final String TOTAL_UPSERT_TIME = "totalUpsertTime";
+ public static final String TOTAL_COMPACTED_RECORDS_UPDATED = "totalCompactedRecordsUpdated";
+ public static final String TOTAL_LOGFILES_COMPACTED = "totalLogFilesCompacted";
+ public static final String TOTAL_LOGFILES_SIZE = "totalLogFilesSize";
+
+ // a queue for buffer metrics event.
+ private final LinkedBlockingQueue<TdbankHoodieMetricsEvent> queue = new LinkedBlockingQueue<>();
+
public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
this.tableName = config.getTableName();
+ this.hoodieJobId = config.getHoodieJobId();
if (config.isMetricsOn()) {
+ // start post event service.
+ AsyncPostEventService postEventService = new AsyncPostEventService(config, queue);
+ postEventService.start(null);
Metrics.init(config);
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
@@ -165,6 +193,25 @@ public class HoodieMetrics {
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
+
+ TdbankHoodieMetricsEvent metricEvent = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT)))
+ .addMetrics("totalPartitionsWritten", 0)
+ .addMetrics("totalFilesUpdate", 0)
+ .addMetrics("totalRecordsWritten", 0)
+ .addMetrics("totalUpdateRecordsWritten", 0)
+ .addMetrics("totalInsertRecordsWritten", 0)
+ .addMetrics("totalBytesWritten", 0)
+ .addMetrics("totalScanTime", 0)
+ .addMetrics("totalCreateTime", 0)
+ .addMetrics("totalUpsertTime", 0)
+ .addMetrics("totalCompactedRecordsUpdated", 0)
+ .addMetrics("totalLogFilesCompacted", 0)
+ .addMetrics("totalLogFilesSize", 0)
+ .build();
+ postEvent(metricEvent);
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
@@ -197,23 +244,55 @@ public class HoodieMetrics {
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
+
+ TdbankHoodieMetricsEvent metricEvent = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT)))
+ .addMetrics("totalPartitionsWritten", totalPartitionsWritten)
+ .addMetrics("totalFilesUpdate", totalFilesUpdate)
+ .addMetrics("totalFilesInsert", totalFilesInsert)
+ .addMetrics("totalRecordsWritten", totalRecordsWritten)
+ .addMetrics("totalUpdateRecordsWritten", totalUpdateRecordsWritten)
+ .addMetrics("totalInsertRecordsWritten", totalInsertRecordsWritten)
+ .addMetrics("totalBytesWritten", totalBytesWritten)
+ .addMetrics("totalScanTime", totalTimeTakenByScanner)
+ .addMetrics("totalCreateTime", totalTimeTakenForInsert)
+ .addMetrics("totalUpsertTime", totalTimeTakenForUpsert)
+ .addMetrics("totalCompactedRecordsUpdated", totalCompactedRecordsUpdated)
+ .addMetrics("totalLogFilesCompacted", totalLogFilesCompacted)
+ .addMetrics("totalLogFilesSize", totalLogFilesSize)
+ .build();
+ postEvent(metricEvent);
}
}
private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
String actionType) {
if (config.isMetricsOn()) {
+ TdbankHoodieMetricsEvent.Builder builder = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf(actionType.toUpperCase(Locale.ROOT)));
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
if (eventTimePairMinMax.getLeft().isPresent()) {
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
+ builder = builder.addMetrics("commitLatencyInMs", commitLatencyInMs);
}
if (eventTimePairMinMax.getRight().isPresent()) {
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
+ builder = builder.addMetrics("commitFreshnessInMs", commitFreshnessInMs);
}
Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
+
+ TdbankHoodieMetricsEvent event = builder
+ .addMetrics("commitTime", commitEpochTimeInMs)
+ .addMetrics("duration", durationInMs)
+ .build();
+ postEvent(event);
}
}
@@ -223,6 +302,14 @@ public class HoodieMetrics {
String.format("Sending rollback metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
Metrics.registerGauge(getMetricsName("rollback", "duration"), durationInMs);
Metrics.registerGauge(getMetricsName("rollback", "numFilesDeleted"), numFilesDeleted);
+ TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("rollback".toUpperCase(Locale.ROOT)))
+ .addMetrics("duration", durationInMs)
+ .addMetrics("numFilesDeleted", numFilesDeleted)
+ .build();
+ postEvent(event);
}
}
@@ -232,6 +319,14 @@ public class HoodieMetrics {
String.format("Sending clean metrics (duration=%d, numFilesDeleted=%d)", durationInMs, numFilesDeleted));
Metrics.registerGauge(getMetricsName("clean", "duration"), durationInMs);
Metrics.registerGauge(getMetricsName("clean", "numFilesDeleted"), numFilesDeleted);
+ TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("clean".toUpperCase(Locale.ROOT)))
+ .addMetrics("duration", durationInMs)
+ .addMetrics("numFilesDeleted", numFilesDeleted)
+ .build();
+ postEvent(event);
}
}
@@ -241,6 +336,14 @@ public class HoodieMetrics {
numFilesFinalized));
Metrics.registerGauge(getMetricsName("finalize", "duration"), durationInMs);
Metrics.registerGauge(getMetricsName("finalize", "numFilesFinalized"), numFilesFinalized);
+ TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("finalize".toUpperCase(Locale.ROOT)))
+ .addMetrics("duration", durationInMs)
+ .addMetrics("numFilesFinalized", numFilesFinalized)
+ .build();
+ postEvent(event);
}
}
@@ -248,11 +351,21 @@ public class HoodieMetrics {
if (config.isMetricsOn()) {
LOG.info(String.format("Sending index metrics (%s.duration, %d)", action, durationInMs));
Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), durationInMs);
+ TdbankHoodieMetricsEvent event = TdbankHoodieMetricsEvent.newBuilder()
+ .withDBName(config.getDatabaseName())
+ .withTableName(config.getTableName())
+ .withTableType(TdbankHoodieMetricsEvent.EventType.valueOf("index".toUpperCase(Locale.ROOT)))
+ .addMetrics(String.format("%s.duration", action), durationInMs)
+ .build();
+ postEvent(event);
}
}
String getMetricsName(String action, String metric) {
- return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
+ // if using zhiyan, then we don't report metrics prefix because we will use tags to identify each metrics
+ return config == null ? null :
+ config.getMetricsReporterType() == MetricsReporterType.ZHIYAN ? String.format("%s.%s", action, metric) :
+ String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
}
/**
@@ -284,4 +397,9 @@ public class HoodieMetrics {
}
return counter;
}
+
+ private void postEvent(TdbankHoodieMetricsEvent event) {
+ LOG.info("Post metrics event to queue, queue size now is " + queue.size());
+ queue.add(event);
+ }
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
index 8f3e497481..10238a9c92 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -120,6 +120,7 @@ public class Metrics {
public static void registerGauge(String metricName, final long value) {
try {
+ LOG.info("Register Metric Name: " + metricName);
MetricRegistry registry = Metrics.getInstance().getRegistry();
HoodieGauge guage = (HoodieGauge) registry.gauge(metricName, () -> new HoodieGauge<>(value));
guage.setValue(value);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index d81e337b28..b67ab63f23 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -29,6 +29,7 @@ import org.apache.hudi.metrics.prometheus.PrometheusReporter;
import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter;
import com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.metrics.zhiyan.ZhiyanMetricsReporter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -81,6 +82,9 @@ public class MetricsReporterFactory {
case CLOUDWATCH:
reporter = new CloudWatchMetricsReporter(config, registry);
break;
+ case ZHIYAN:
+ reporter = new ZhiyanMetricsReporter(config, registry);
+ break;
default:
LOG.error("Reporter type[" + type + "] is not supported.");
break;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
index 3c86001592..29a8097a50 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
@@ -22,5 +22,5 @@ package org.apache.hudi.metrics;
* Types of the reporter supported, hudi also supports user defined reporter.
*/
public enum MetricsReporterType {
- GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH
+ GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, ZHIYAN
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java
new file mode 100644
index 0000000000..b358ce182b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanHttpClient.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.zhiyan;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.Consts;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class ZhiyanHttpClient {
+
+ private static final Logger LOG = LogManager.getLogger(ZhiyanHttpClient.class);
+ private final CloseableHttpClient httpClient;
+ private final ObjectMapper mapper;
+ private final String serviceUrl;
+ private final String requestPath;
+
+ private static final String JSON_CONTENT_TYPE = "application/json";
+ private static final String CONTENT_TYPE = "Content-Type";
+
+ public ZhiyanHttpClient(String url, String path, int timeoutSeconds) {
+ httpClient = HttpClientBuilder.create()
+ .setDefaultRequestConfig(RequestConfig.custom()
+ .setConnectTimeout(timeoutSeconds * 1000)
+ .setConnectionRequestTimeout(timeoutSeconds * 1000)
+ .setSocketTimeout(timeoutSeconds * 1000).build())
+ .build();
+
+ serviceUrl = url;
+ requestPath = path;
+
+ mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+ mapper.configure(JsonParser.Feature.IGNORE_UNDEFINED, true);
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ }
+
+ public <T> String post(T input) throws Exception {
+ HttpPost postReq = new HttpPost(serviceUrl + requestPath);
+ postReq.setHeader(CONTENT_TYPE, JSON_CONTENT_TYPE);
+
+ try {
+ return requestWithEntity(postReq, input);
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed to post to %s, cause by", serviceUrl + requestPath), e);
+ throw e;
+ } finally {
+ postReq.releaseConnection();
+ }
+ }
+
+ private <T> String requestWithEntity(HttpRequestBase request, T input) throws Exception {
+ if (input != null && request instanceof HttpEntityEnclosingRequestBase) {
+ HttpEntity entity = getEntity(input);
+ ((HttpEntityEnclosingRequestBase) request).setEntity(entity);
+ }
+
+ HttpContext httpContext = new BasicHttpContext();
+ try (CloseableHttpResponse response = httpClient.execute(request, httpContext)) {
+ int status = response.getStatusLine().getStatusCode();
+ if (status != HttpStatus.SC_OK && status != HttpStatus.SC_CREATED) {
+ throw new HttpException("Response code is " + status);
+ }
+ HttpEntity resultEntity = response.getEntity();
+ return EntityUtils.toString(resultEntity, Consts.UTF_8);
+ } catch (Exception ex) {
+ LOG.error("Error when request http.", ex);
+ throw ex;
+ }
+ }
+
+ private <T> HttpEntity getEntity(T input) throws JsonProcessingException {
+ HttpEntity entity;
+ if (input instanceof String) {
+ entity = new StringEntity((String) input, ContentType.APPLICATION_JSON);
+ } else if (input instanceof HttpEntity) {
+ return (HttpEntity) input;
+ } else {
+ try {
+ String json = mapper.writeValueAsString(input);
+ entity = new StringEntity(json, ContentType.APPLICATION_JSON);
+ } catch (JsonProcessingException e) {
+ LOG.error(String.format("Error when process %s due to ", input), e);
+ throw e;
+ }
+ }
+ return entity;
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java
new file mode 100644
index 0000000000..323fe17106
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanMetricsReporter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.zhiyan;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+
+import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
+
+public class ZhiyanMetricsReporter extends MetricsReporter {
+
+ private final ZhiyanReporter reporter;
+ private final int reportPeriodSeconds;
+
+ public ZhiyanMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
+ this.reportPeriodSeconds = config.getZhiyanReportPeriodSeconds();
+ ZhiyanHttpClient client = new ZhiyanHttpClient(
+ config.getZhiyanReportServiceURL(),
+ config.getZhiyanReportServicePath(),
+ config.getZhiyanApiTimeoutSeconds());
+ this.reporter = new ZhiyanReporter(registry, MetricFilter.ALL, client,
+ config.getZhiyanHoodieJobName(),
+ config.getTableName(),
+ config.getZhiyanAppMask(),
+ config.getZhiyanSeclvlEnvName());
+ }
+
+ @Override
+ public void start() {
+ reporter.start(reportPeriodSeconds, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void report() {
+ reporter.report();
+ }
+
+ @Override
+ public Closeable getReporter() {
+ return reporter;
+ }
+
+ @Override
+ public void stop() {
+ reporter.stop();
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java
new file mode 100644
index 0000000000..4e5d416989
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/zhiyan/ZhiyanReporter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.zhiyan;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+
+public class ZhiyanReporter extends ScheduledReporter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZhiyanReporter.class);
+ private final ZhiyanHttpClient client;
+ private final String jobName;
+ private final String hoodieTableName;
+ private final String appMask;
+ private final String seclvlEnName;
+
+ public ZhiyanReporter(MetricRegistry registry,
+ MetricFilter filter,
+ ZhiyanHttpClient client,
+ String jobName,
+ String hoodieTableName,
+ String appMask,
+ String seclvlEnName) {
+ super(registry, "hudi-zhiyan-reporter", filter, TimeUnit.SECONDS, TimeUnit.SECONDS);
+ this.client = client;
+ this.jobName = jobName;
+ this.hoodieTableName = hoodieTableName;
+ this.appMask = appMask;
+ this.seclvlEnName = seclvlEnName;
+ }
+
+ @Override
+ public void report(SortedMap<String, Gauge> gauges,
+ SortedMap<String, Counter> counters,
+ SortedMap<String, Histogram> histograms,
+ SortedMap<String, Meter> meters,
+ SortedMap<String, Timer> timers) {
+ final PayloadBuilder builder = new PayloadBuilder()
+ .withAppMask(appMask)
+ .withJobName(jobName)
+ .withSeclvlEnName(seclvlEnName)
+ .withTableName(hoodieTableName);
+
+ long timestamp = System.currentTimeMillis();
+
+ gauges.forEach((metricName, gauge) -> {
+ builder.addGauge(metricName, timestamp, gauge.getValue().toString());
+ });
+
+ String payload = builder.build();
+
+ LOG.info("Payload is:" + payload);
+ try {
+ client.post(payload);
+ } catch (Exception e) {
+ LOG.error("Payload is " + payload);
+ LOG.error("Error when report data to zhiyan", e);
+ }
+ }
+
+ static class PayloadBuilder {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final ObjectNode payload;
+
+ private final ArrayNode reportData;
+
+ private String appMark;
+ // 指标组
+ private String seclvlEnName;
+
+ private String jobName;
+
+ private String tableName;
+
+ public PayloadBuilder() {
+ MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ MAPPER.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+ MAPPER.configure(JsonParser.Feature.IGNORE_UNDEFINED, true);
+ MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ this.payload = MAPPER.createObjectNode();
+ this.reportData = MAPPER.createArrayNode();
+ }
+
+ PayloadBuilder withAppMask(String appMark) {
+ this.appMark = appMark;
+ this.payload.put("app_mark", appMark);
+ return this;
+ }
+
+ PayloadBuilder withJobName(String jobName) {
+ this.jobName = jobName;
+ return this;
+ }
+
+ PayloadBuilder withTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ PayloadBuilder withSeclvlEnName(String seclvlEnName) {
+ this.seclvlEnName = seclvlEnName;
+ this.payload.put("sec_lvl_en_name", seclvlEnName);
+ return this;
+ }
+
+ PayloadBuilder addGauge(String metric, long timestamp, String gaugeValue) {
+ ObjectNode tmpData = MAPPER.createObjectNode();
+ tmpData.put("metric", metric);
+ tmpData.put("value", Long.parseLong(gaugeValue));
+ // tags means dimension in zhiyan.
+ ObjectNode tags = tmpData.objectNode();
+ tags.put("jobName", jobName);
+ tags.put("tableName", tableName);
+ tmpData.set("tags", tags);
+ this.reportData.add(tmpData);
+ return this;
+ }
+
+ PayloadBuilder addHistogram() {
+ return this;
+ }
+
+ PayloadBuilder addCounter() {
+ return this;
+ }
+
+ PayloadBuilder addMeters() {
+ return this;
+ }
+
+ String build() {
+ payload.put("report_data", reportData.toString());
+ return payload.toString();
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java
new file mode 100644
index 0000000000..85f7a9b0b9
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TDBankClient.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.tdbank;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.tencent.tdbank.busapi.BusClientConfig;
+import com.tencent.tdbank.busapi.DefaultMessageSender;
+import com.tencent.tdbank.busapi.MessageSender;
+import com.tencent.tdbank.busapi.SendResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+public class TDBankClient implements Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(TDBankClient.class);
+ private static final Long TDBANK_SENDER_TIMEOUT_MS =
+ Long.parseLong(System.getProperty("tdbank.sender.timeout-ms", "20000"));
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final String HUDI_EVENT_TID = "hudi_metric";
+
+ private final String bid;
+ private MessageSender sender;
+ private String tdmAddr;
+ private int tdmPort;
+ private volatile boolean hasInit = false;
+
+ private static final int RETRY_TIMES = 3;
+
+ public TDBankClient(String tdmAddr, int tdmPort, String bid) {
+ this.bid = bid;
+ this.tdmAddr = tdmAddr;
+ this.tdmPort = tdmPort;
+ }
+
+ /**
+ * send message to tdbank and return send result
+ */
+ public SendResult sendMessage(Object message) throws Exception {
+ init();
+ LOG.info("Send message to tdbank, bid: {}, tid: {}", bid, HUDI_EVENT_TID);
+ int retryTimes = 0;
+ while (retryTimes < RETRY_TIMES) {
+ try {
+ return sender.sendMessage(MAPPER.writeValueAsBytes(message),
+ bid, HUDI_EVENT_TID, 0, UUID.randomUUID().toString(), TDBANK_SENDER_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ retryTimes++;
+ LOG.error("Error when send data to tdbank retry " + retryTimes, e);
+ }
+ }
+ return SendResult.UNKOWN_ERROR;
+ }
+
+ @Override
+ public void close() throws IOException {
+ sender.close();
+ }
+
+ private void init() throws Exception {
+ if (!hasInit) {
+ synchronized (this) {
+ if (!hasInit) {
+ try {
+ LOG.info("Init tdbank-client with tdmAddress: {}, tdmPort: {}, bid: {}", tdmAddr, tdmPort, bid);
+ String localhost = InetAddress.getLocalHost().getHostAddress();
+ BusClientConfig clientConfig =
+ new BusClientConfig(localhost, true, tdmAddr, tdmPort, bid, "all");
+ LOG.info("Before sender generated.");
+ sender = new DefaultMessageSender(clientConfig);
+ LOG.info("Successfully init sender.");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize tdbank client, using mock client instead. "
+ + "Warn: using mock client will ignore all the incoming events", e);
+ throw e;
+ }
+ hasInit = true;
+ }
+ }
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java
new file mode 100644
index 0000000000..60a5e06a45
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.tdbank;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+import java.util.Properties;
+
+@Immutable
+@ConfigClassProperty(name = "Tdbank Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Tdbank configs")
+public class TdbankConfig extends HoodieConfig {
+ public static final ConfigProperty<String> TDBANK_TDM_ADDR = ConfigProperty
+ .key("hoodie.tdbank.tdm.addr")
+ .defaultValue("tl-tdbank-tdmanager.tencent-distribute.com")
+ .withDocumentation("tdbank manager address.");
+
+ public static final ConfigProperty<Integer> TDBANK_TDM_PORT = ConfigProperty
+ .key("hoodie.tdbank.tdm.port")
+ .defaultValue(8099)
+ .withDocumentation("tdbank manager port.");
+
+ public static final ConfigProperty<String> TDBANK_BID = ConfigProperty
+ .key("hoodie.tdbank.tdbank.bid")
+ .defaultValue("b_teg_iceberg_event_tdbank_mq")
+ .withDocumentation("tdbank bid, use iceberg's bid temporarily.");
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private final TdbankConfig hoodieTdbankConfig = new TdbankConfig();
+
+ public Builder withTDMAddr(String tdmAddr) {
+ hoodieTdbankConfig.setValue(TDBANK_TDM_ADDR, tdmAddr);
+ return this;
+ }
+
+ public Builder fromProperties(Properties props) {
+ hoodieTdbankConfig.setAll(props);
+ return this;
+ }
+
+ public Builder withTDMPort(int tdmPort) {
+ hoodieTdbankConfig.setValue(TDBANK_TDM_PORT, String.valueOf(tdmPort));
+ return this;
+ }
+
+ public Builder withBID(String bid) {
+ hoodieTdbankConfig.setValue(TDBANK_BID, bid);
+ return this;
+ }
+
+ public TdbankConfig build() {
+ hoodieTdbankConfig.setDefaults(TdbankConfig.class.getName());
+ return hoodieTdbankConfig;
+ }
+ }
+
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java
new file mode 100644
index 0000000000..0be78386a1
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/tdbank/TdbankHoodieMetricsEvent.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.tdbank;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TdbankHoodieMetricsEvent implements Serializable {
+ private String dbName;
+ private String tableName;
+ private EventType type;
+ private Map<String, Object> metrics;
+
+ private TdbankHoodieMetricsEvent() {
+ this.metrics = new TreeMap<>();
+ }
+
+ public enum EventType {
+ INDEX, CLEAN, FINALIZE, ROLLBACK, COMPACTION, COMMIT, DELTACOMMIT, REPLACECOMMIT
+ }
+
+ public static TdbankHoodieMetricsEvent.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private final TdbankHoodieMetricsEvent hoodieMetricsEvent = new TdbankHoodieMetricsEvent();
+
+ public Builder() {
+ }
+
+ public Builder withDBName(String dbName) {
+ hoodieMetricsEvent.setDbName(dbName);
+ return this;
+ }
+
+ public Builder withTableName(String tableName) {
+ hoodieMetricsEvent.setTableName(tableName);
+ return this;
+ }
+
+ public Builder withTableType(EventType type) {
+ hoodieMetricsEvent.setType(type);
+ return this;
+ }
+
+ public Builder addMetrics(String key, Object value) {
+ hoodieMetricsEvent.addMetrics(key, value);
+ return this;
+ }
+
+ public TdbankHoodieMetricsEvent build() {
+ return hoodieMetricsEvent;
+ }
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public void setType(EventType type) {
+ this.type = type;
+ }
+
+ public void addMetrics(String key, Object value) {
+ this.metrics.put(key, value);
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public EventType getType() {
+ return type;
+ }
+
+ public Map<String, Object> getMetrics() {
+ return metrics;
+ }
+
+ public Object getMetric(String key) {
+ return metrics.get(key);
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+}
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 53a5799508..34198d456c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -122,6 +122,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
@Override
public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
+ if (commitActionType.equals(HoodieTimeline.COMMIT_ACTION)) {
+ writeTimer = metrics.getCommitCtx();
+ } else {
+ writeTimer = metrics.getDeltaCommitCtx();
+ }
return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
}
@@ -436,6 +441,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
// only used for metadata table, the compaction happens in single thread
HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
+ compactionTimer = metrics.getCompactionCtx();
return compactionMetadata;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 89d01b53a6..85f970e7ec 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -90,8 +90,10 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> DATABASE_NAME = ConfigProperty
.key("hoodie.database.name")
.noDefaultValue()
- .withDocumentation("Database name that will be used for incremental query.If different databases have the same table name during incremental query, "
- + "we can set it to limit the table name under a specific database");
+ .withDocumentation("Database name to identify a table, currently will be used for "
+ + "1. incremental query.If different databases have the same table name during incremental query "
+ + "we can set it to limit the table name under a specific database"
+ + "2. identify a table");
public static final ConfigProperty<String> NAME = ConfigProperty
.key(HOODIE_TABLE_NAME_KEY)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index a9e10d3e55..4a298839fb 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -82,6 +82,11 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Common Options
// ------------------------------------------------------------------------
+ public static final ConfigOption<String> DATABASE_NAME = ConfigOptions
+ .key(HoodieWriteConfig.DATABASE_NAME.key())
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Database name to identify tables");
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key(HoodieWriteConfig.TBL_NAME.key())
@@ -411,7 +416,7 @@ public class FlinkOptions extends HoodieConfig {
.key("write.bucket_assign.tasks")
.intType()
.noDefaultValue()
- .withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism");
+ .withDescription("Parallelism of tasks that do bucket assign, default is the parallelism of the execution environment");
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
@@ -522,8 +527,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<Integer> COMPACTION_TASKS = ConfigOptions
.key("compaction.tasks")
.intType()
- .noDefaultValue()
- .withDescription("Parallelism of tasks that do actual compaction, default same as the write task parallelism");
+ .defaultValue(4) // default WRITE_TASKS * COMPACTION_DELTA_COMMITS * 0.2 (assumes 5 commits generate one bucket)
+ .withDescription("Parallelism of tasks that do actual compaction, default is 4");
public static final String NUM_COMMITS = "num_commits";
public static final String TIME_ELAPSED = "time_elapsed";
@@ -580,7 +585,7 @@ public class FlinkOptions extends HoodieConfig {
.stringType()
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDescription("Clean policy to manage the Hudi table. Available option: KEEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS."
- + "Default is KEEP_LATEST_COMMITS.");
+ + "Default is KEEP_LATEST_COMMITS.");
public static final ConfigOption<Integer> CLEAN_RETAIN_COMMITS = ConfigOptions
.key("clean.retain_commits")
@@ -589,14 +594,6 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Number of commits to retain. So data will be retained for num_of_commits * time_between_commits (scheduled).\n"
+ "This also directly translates into how much you can incrementally pull on this table, default 30");
- public static final ConfigOption<Integer> CLEAN_RETAIN_HOURS = ConfigOptions
- .key("clean.retain_hours")
- .intType()
- .defaultValue(24)// default 24 hours
- .withDescription("Number of hours for which commits need to be retained. This config provides a more flexible option as"
- + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
- + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
-
public static final ConfigOption<Integer> CLEAN_RETAIN_FILE_VERSIONS = ConfigOptions
.key("clean.retain_file_versions")
.intType()
@@ -660,7 +657,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME = ConfigOptions
.key("clustering.plan.partition.filter.mode")
.stringType()
- .defaultValue(ClusteringPlanPartitionFilterMode.NONE.name())
+ .defaultValue("NONE")
.withDescription("Partition filter mode used in the creation of clustering plan. Available values are - "
+ "NONE: do not filter table partition and thus the clustering plan will include all partitions that have clustering candidate."
+ "RECENT_DAYS: keep a continuous range of partitions, worked together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
@@ -668,16 +665,16 @@ public class FlinkOptions extends HoodieConfig {
+ "SELECTED_PARTITIONS: keep partitions that are in the specified range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+ PARTITION_FILTER_END_PARTITION.key() + "'].");
- public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
+ public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES = ConfigOptions
.key("clustering.plan.strategy.target.file.max.bytes")
- .longType()
- .defaultValue(1024 * 1024 * 1024L) // default 1 GB
+ .intType()
+ .defaultValue(1024 * 1024 * 1024) // default 1 GB
.withDescription("Each group can produce 'N' (CLUSTERING_MAX_GROUP_SIZE/CLUSTERING_TARGET_FILE_SIZE) output file groups, default 1 GB");
- public static final ConfigOption<Long> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
+ public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT = ConfigOptions
.key("clustering.plan.strategy.small.file.limit")
- .longType()
- .defaultValue(600L) // default 600 MB
+ .intType()
+ .defaultValue(600) // default 600 MB
.withDescription("Files smaller than the size specified here are candidates for clustering, default 600 MB");
public static final ConfigOption<Integer> CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST = ConfigOptions
@@ -701,7 +698,6 @@ public class FlinkOptions extends HoodieConfig {
// ------------------------------------------------------------------------
// Hive Sync Options
// ------------------------------------------------------------------------
-
public static final ConfigOption<Boolean> HIVE_SYNC_ENABLED = ConfigOptions
.key("hive_sync.enable")
.booleanType()
@@ -729,8 +725,8 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> HIVE_SYNC_MODE = ConfigOptions
.key("hive_sync.mode")
.stringType()
- .defaultValue(HiveSyncMode.HMS.name())
- .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'hms'");
+ .defaultValue("jdbc")
+ .withDescription("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql, default 'jdbc'");
public static final ConfigOption<String> HIVE_SYNC_USERNAME = ConfigOptions
.key("hive_sync.username")
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index f022b04ea1..b2f72aed7d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -74,6 +74,9 @@ public class FlinkStreamerConfig extends Configuration {
required = true)
public String targetBasePath;
+ @Parameter(names = {"--target-db"}, description = "Name of target database")
+ public String targetDatabaseName;
+
@Parameter(names = {"--target-table"}, description = "Name of the target table in Hive.", required = true)
public String targetTableName;
@@ -351,6 +354,7 @@ public class FlinkStreamerConfig extends Configuration {
conf.setString(FlinkOptions.PATH, config.targetBasePath);
conf.setString(FlinkOptions.TABLE_NAME, config.targetTableName);
+ conf.setString(FlinkOptions.DATABASE_NAME, config.targetDatabaseName);
// copy_on_write works same as COPY_ON_WRITE
conf.setString(FlinkOptions.TABLE_TYPE, config.tableType.toUpperCase());
conf.setBoolean(FlinkOptions.INSERT_CLUSTER, config.insertCluster);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index b153b2273c..b08eb570ce 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -107,6 +107,8 @@ public class HoodieFlinkStreamer {
Pipelines.clean(conf, pipeline);
}
- env.execute(cfg.targetTableName);
+ String jobName = cfg.targetDatabaseName.isEmpty() ? cfg.targetTableName :
+ cfg.targetDatabaseName + "." + cfg.targetTableName;
+ env.execute(jobName);
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 1cf66ea343..1718175240 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -169,6 +169,8 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab
ObjectIdentifier tablePath,
CatalogTable table,
ResolvedSchema schema) {
+ // database name
+ conf.setString(FlinkOptions.DATABASE_NAME.key(), tablePath.getDatabaseName());
// table name
conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName());
// hoodie key about options
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index ee807f49da..0df3a0c8da 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -164,8 +164,13 @@ public class DataSourceUtils {
});
}
+ public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath, String tblName,
+ Map<String, String> parameters) {
+ return createHoodieConfig(schemaStr, basePath, "default_db", tblName, parameters);
+ }
+
public static HoodieWriteConfig createHoodieConfig(String schemaStr, String basePath,
- String tblName, Map<String, String> parameters) {
+ String dbName, String tblName, Map<String, String> parameters) {
boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key()));
boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE().key())
.equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
@@ -178,6 +183,7 @@ public class DataSourceUtils {
}
return builder.forTable(tblName)
+ .withDatabaseName(dbName)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(inlineCompact).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
@@ -189,8 +195,8 @@ public class DataSourceUtils {
}
public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
- String tblName, Map<String, String> parameters) {
- return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, tblName, parameters));
+ String dbName, String tblName, Map<String, String> parameters) {
+ return new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), createHoodieConfig(schemaStr, basePath, dbName, tblName, parameters));
}
public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 0d3edd592d..3b1caddb59 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -47,7 +47,7 @@ object HoodieCLIUtils {
val jsc = new JavaSparkContext(sparkSession.sparkContext)
DataSourceUtils.createHoodieClient(jsc, schemaStr, basePath,
- metaClient.getTableConfig.getTableName, finalParameters.asJava)
+ metaClient.getTableConfig.getDatabaseName, metaClient.getTableConfig.getTableName, finalParameters.asJava)
}
def extractPartitions(clusteringGroups: Seq[HoodieClusteringGroup]): String = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index b9ff4c0d1a..61cb7ef961 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -176,6 +176,7 @@ object HoodieSparkSqlWriter {
// scalastyle:off
if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&
operation == WriteOperationType.BULK_INSERT) {
+ parameters.put(HoodieWriteConfig.DATABASE_NAME.key(), databaseName)
val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,
basePath, path, instantTime, partitionColumns)
return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
@@ -197,7 +198,7 @@ object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- null, path, tblName,
+ null, path, databaseName, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
@@ -228,7 +229,7 @@ object HoodieSparkSqlWriter {
}
// Create a HoodieWriteClient & issue the delete.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- null, path, tblName,
+ null, path, databaseName, tblName,
mapAsJavaMap(parameters - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
.asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
// Issue delete partitions
@@ -310,7 +311,7 @@ object HoodieSparkSqlWriter {
// Create a HoodieWriteClient & issue the write.
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path,
- tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
+ databaseName, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
@@ -441,6 +442,7 @@ object HoodieSparkSqlWriter {
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
+ val databaseName = hoodieConfig.getStringOrDefault(HoodieWriteConfig.DATABASE_NAME, "default")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
@@ -486,6 +488,7 @@ object HoodieSparkSqlWriter {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.valueOf(tableType))
.setTableName(tableName)
+ .setDatabaseName(databaseName)
.setRecordKeyFields(recordKeyFields)
.setArchiveLogFolder(archiveLogFolder)
.setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME))
@@ -506,7 +509,7 @@ object HoodieSparkSqlWriter {
val jsc = new JavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
- schema, path, tableName, mapAsJavaMap(parameters)))
+ schema, path, databaseName, tableName, mapAsJavaMap(parameters)))
try {
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())
} finally {
@@ -555,6 +558,7 @@ object HoodieSparkSqlWriter {
}
val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)
params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
+ val dbName = parameters.getOrElse(HoodieWriteConfig.DATABASE_NAME.key(), "default")
val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index 1d65670f6d..69e120c2e3 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -106,6 +106,7 @@ object AlterHoodieTableAddColumnsCommand {
jsc,
schema.toString,
hoodieCatalogTable.tableLocation,
+ hoodieCatalogTable.table.identifier.database.getOrElse("default"),
hoodieCatalogTable.tableName,
HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava
)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index f0394ad379..b098ff3ea4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -21,9 +21,9 @@ import org.apache.avro.Schema
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.config.HoodieWriteConfig.{DATABASE_NAME, TBL_NAME}
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, SparkAdapterSupport}
import org.apache.spark.sql.HoodieCatalystExpressionUtils.MatchCast
import org.apache.spark.sql._
@@ -530,6 +530,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
PRECOMBINE_FIELD.key -> preCombineField,
TBL_NAME.key -> hoodieCatalogTable.tableName,
+ DATABASE_NAME.key -> targetTableDb,
PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 11f0fc9785..4d0d5aeef2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -237,7 +237,7 @@ public class TestDataSourceUtils {
DataSourceWriteOptions.PAYLOAD_CLASS_NAME().defaultValue());
params.put(pair.left, pair.right.toString());
HoodieWriteConfig hoodieConfig = DataSourceUtils
- .createHoodieConfig(avroSchemaString, config.getBasePath(), "test", params);
+ .createHoodieConfig(avroSchemaString, config.getBasePath(), "testdb", "test", params);
assertEquals(pair.right, hoodieConfig.isAsyncClusteringEnabled());
TypedProperties prop = new TypedProperties();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 4e4fe43ff9..114c3d0f37 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -62,6 +62,7 @@ class TestHoodieSparkSqlWriter {
var tempPath: java.nio.file.Path = _
var tempBootStrapPath: java.nio.file.Path = _
var hoodieFooTableName = "hoodie_foo_tbl"
+ val hoodieDefaultDBName = "default_db"
var tempBasePath: String = _
var commonTableModifier: Map[String, String] = Map()
case class StringLongTest(uuid: String, ts: Long)
@@ -490,6 +491,7 @@ class TestHoodieSparkSqlWriter {
@MethodSource(Array("testDatasourceInsert"))
def testDatasourceInsertForTableTypeBaseFileMetaFields(tableType: String, populateMetaFields: Boolean, baseFileFormat: String): Unit = {
val hoodieFooTableName = "hoodie_foo_tbl"
+ val hoodieDefaultDBName = "default_db"
val fooTableModifier = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
HoodieWriteConfig.BASE_FILE_FORMAT.key -> baseFileFormat,
@@ -510,7 +512,7 @@ class TestHoodieSparkSqlWriter {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
initializeMetaClientForBootstrap(fooTableParams, tableType, addBootstrapPath = false, initBasePath = true)
val client = spy(DataSourceUtils.createHoodieClient(
- new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieFooTableName,
+ new JavaSparkContext(sc), modifiedSchema.toString, tempBasePath, hoodieDefaultDBName, hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, df, Option.empty, Option(client))
@@ -571,6 +573,7 @@ class TestHoodieSparkSqlWriter {
new JavaSparkContext(sc),
null,
tempBasePath,
+ hoodieDefaultDBName,
hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
index 3b3b8eafb8..5746cacb0b 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
@@ -65,6 +65,7 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get();
+ String dbName = options.get(HoodieWriteConfig.DATABASE_NAME.key()).get();
boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue());
Map<String, String> properties = options.asMap();
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
index ab2f16703b..90ae1e7377 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
@@ -52,6 +52,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
String instantTime = properties.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY);
String path = properties.get("path");
String tblName = properties.get(HoodieWriteConfig.TBL_NAME.key());
+ String dbName = properties.get(HoodieWriteConfig.DATABASE_NAME.key());
boolean populateMetaFields = Boolean.parseBoolean(properties.getOrDefault(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.toString(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())));
boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
@@ -61,7 +62,8 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
// Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
// 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
- HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps);
+ HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path,
+ dbName, tblName, newProps);
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
getConfiguration(), newProps, populateMetaFields, arePartitionRecordsSorted);
}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
index 9a5366b12f..529b5bb49e 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/hudi/command/Spark31AlterTableCommand.scala
@@ -217,7 +217,7 @@ object Spark31AlterTableCommand extends Logging {
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
- path, table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava)
+ path, table.identifier.database.getOrElse("default_db"), table.identifier.table, parametersWithWriteDefaults(table.storage.properties).asJava)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(hadoopConf).build()