You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/07/05 02:24:25 UTC
[hudi] branch master updated: [HUDI-3116]Add a new HoodieDropPartitionsTool to let users drop table partitions through a standalone job. (#4459)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 45fdcf68a1 [HUDI-3116]Add a new HoodieDropPartitionsTool to let users drop table partitions through a standalone job. (#4459)
45fdcf68a1 is described below
commit 45fdcf68a1ce7e780f472b4af67edb4946a4fbc3
Author: YueZhang <69...@users.noreply.github.com>
AuthorDate: Tue Jul 5 10:24:18 2022 +0800
[HUDI-3116]Add a new HoodieDropPartitionsTool to let users drop table partitions through a standalone job. (#4459)
Co-authored-by: yuezhang <yu...@freewheel.tv>
---
.../hudi/utilities/HoodieDropPartitionsTool.java | 396 +++++++++++++++++++++
1 file changed, 396 insertions(+)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
new file mode 100644
index 0000000000..50fdf36c81
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HiveSyncTool;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import scala.Tuple2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * A tool with spark-submit to drop Hudi table partitions.
+ *
+ * <p>
+ * You can dry run this tool with the following command to look and print for the table partitions and corresponding data files which will be deleted.
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode dry_run \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * <p>
+ *
+ * You can delete the table partitions with '--mode delete'
+ *
+ * - DELETE ("delete"): This tool will mask/tombstone these partitions and corresponding data files and let cleaner delete these files later.
+ * - Also you can set --sync-hive-meta to sync current drop partition into hive
+ * <p>
+ * Example command:
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode delete \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * Also you can use --help to find more configs to use.
+ */
+public class HoodieDropPartitionsTool implements Serializable {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieDropPartitionsTool.class);
+ // Spark context
+ private final transient JavaSparkContext jsc;
+ // config
+ private final Config cfg;
+ // Properties with source, hoodie client, key generator etc.
+ private TypedProperties props;
+
+ private final HoodieTableMetaClient metaClient;
+
+ public HoodieDropPartitionsTool(JavaSparkContext jsc, Config cfg) {
+ this.jsc = jsc;
+ this.cfg = cfg;
+
+ this.props = cfg.propsFilePath == null
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ this.metaClient = HoodieTableMetaClient.builder()
+ .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath)
+ .setLoadActiveTimelineOnLoad(true)
+ .build();
+ }
+
+ /**
+ * Reads config from the file system.
+ *
+ * @param jsc {@link JavaSparkContext} instance.
+ * @param cfg {@link Config} instance.
+ * @return the {@link TypedProperties} instance.
+ */
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public enum Mode {
+ // Mask/Tombstone these partitions and corresponding data files and let cleaner delete these files later.
+ DELETE,
+ // Dry run by looking for the table partitions and corresponding data files which will be deleted.
+ DRY_RUN
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+ public String basePath = null;
+ @Parameter(names = {"--mode", "-m"}, description = "Set job mode: "
+ + "Set \"delete\" means mask/tombstone these partitions and corresponding data files table partitions and let cleaner delete these files later;"
+ + "Set \"dry_run\" means only looking for the table partitions will be deleted and corresponding data files.", required = true)
+ public String runningMode = null;
+ @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+ public String tableName = null;
+ @Parameter(names = {"--partitions", "-p"}, description = "Comma separated list of partitions to delete.", required = true)
+ public String partitions = null;
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert/upsert/delete", required = false)
+ public int parallelism = 1500;
+ @Parameter(names = {"--instant-time", "-it"}, description = "instant time for delete table partitions operation.", required = false)
+ public String instantTime = null;
+ @Parameter(names = {"--sync-hive-meta", "-sync"}, description = "Sync information to HMS.", required = false)
+ public boolean syncToHive = false;
+ @Parameter(names = {"--hive-database", "-db"}, description = "Database to sync to.", required = false)
+ public String hiveDataBase = null;
+ @Parameter(names = {"--hive-table-name"}, description = "Table to sync to.", required = false)
+ public String hiveTableName = null;
+ @Parameter(names = {"--hive-user-name", "-user"}, description = "hive user name to use.", required = false)
+ public String hiveUserName = "hive";
+ @Parameter(names = {"--hive-pass-word", "-pass"}, description = "hive password to use.", required = false)
+ public String hivePassWord = "hive";
+ @Parameter(names = {"--hive-jdbc-url", "-jdbc"}, description = "hive url to use.", required = false)
+ public String hiveURL = "jdbc:hive2://localhost:10000";
+ @Parameter(names = {"--hive-partition-field"}, description = "Comma separated list of field in the hive table to use for determining hive partition columns.", required = false)
+ public String hivePartitionsField = "";
+ @Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization.", required = false)
+ public boolean hiveUseJdbc = true;
+ @Parameter(names = {"--hive-metastore-uris"}, description = "hive meta store uris to use.", required = false)
+ public String hiveHMSUris = null;
+ @Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.", required = false)
+ public String hiveSyncMode = "hms";
+ @Parameter(names = {"--hive-sync-ignore-exception"}, description = "Ignore hive sync exception.", required = false)
+ public boolean hiveSyncIgnoreException = false;
+ @Parameter(names = {"--hive-partition-value-extractor-class"}, description = "Class which implements PartitionValueExtractor to extract the partition values,"
+ + " default 'SlashEncodedDayPartitionValueExtractor'.", required = false)
+ public String partitionValueExtractorClass = "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor";
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
+ public String sparkMaster = null;
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
+ public String sparkMemory = "1g";
+ @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+ + "hoodie client for deleting table partitions")
+ public String propsFilePath = null;
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Override
+ public String toString() {
+ return "HoodieDropPartitionsToolConfig {\n"
+ + " --base-path " + basePath + ", \n"
+ + " --mode " + runningMode + ", \n"
+ + " --table-name " + tableName + ", \n"
+ + " --partitions " + partitions + ", \n"
+ + " --parallelism " + parallelism + ", \n"
+ + " --instantTime " + instantTime + ", \n"
+ + " --sync-hive-meta " + syncToHive + ", \n"
+ + " --hive-database " + hiveDataBase + ", \n"
+ + " --hive-table-name " + hiveTableName + ", \n"
+ + " --hive-user-name " + "Masked" + ", \n"
+ + " --hive-pass-word " + "Masked" + ", \n"
+ + " --hive-jdbc-url " + hiveURL + ", \n"
+ + " --hive-partition-field " + hivePartitionsField + ", \n"
+ + " --hive-sync-use-jdbc " + hiveUseJdbc + ", \n"
+ + " --hive-metastore-uris " + hiveHMSUris + ", \n"
+ + " --hive-sync-ignore-exception " + hiveSyncIgnoreException + ", \n"
+ + " --hive-partition-value-extractor-class " + partitionValueExtractorClass + ", \n"
+ + " --spark-master " + sparkMaster + ", \n"
+ + " --spark-memory " + sparkMemory + ", \n"
+ + " --props " + propsFilePath + ", \n"
+ + " --hoodie-conf " + configs
+ + "\n}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Config config = (Config) o;
+ return basePath.equals(config.basePath)
+ && Objects.equals(runningMode, config.runningMode)
+ && Objects.equals(tableName, config.tableName)
+ && Objects.equals(partitions, config.partitions)
+ && Objects.equals(instantTime, config.instantTime)
+ && Objects.equals(syncToHive, config.syncToHive)
+ && Objects.equals(hiveDataBase, config.hiveDataBase)
+ && Objects.equals(hiveTableName, config.hiveTableName)
+ && Objects.equals(hiveUserName, config.hiveUserName)
+ && Objects.equals(hivePassWord, config.hivePassWord)
+ && Objects.equals(hiveURL, config.hiveURL)
+ && Objects.equals(hivePartitionsField, config.hivePartitionsField)
+ && Objects.equals(hiveUseJdbc, config.hiveUseJdbc)
+ && Objects.equals(hiveHMSUris, config.hiveHMSUris)
+ && Objects.equals(partitionValueExtractorClass, config.partitionValueExtractorClass)
+ && Objects.equals(sparkMaster, config.sparkMaster)
+ && Objects.equals(sparkMemory, config.sparkMemory)
+ && Objects.equals(propsFilePath, config.propsFilePath)
+ && Objects.equals(configs, config.configs)
+ && Objects.equals(hiveSyncIgnoreException, config.hiveSyncIgnoreException);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(basePath, runningMode, tableName, partitions, instantTime,
+ syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord, hiveURL,
+ hivePartitionsField, hiveUseJdbc, hiveHMSUris, partitionValueExtractorClass,
+ sparkMaster, sparkMemory, propsFilePath, configs, hiveSyncIgnoreException, help);
+ }
+ }
+
+ public static void main(String[] args) {
+ final Config cfg = new Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Drop-Table-Partitions", cfg.sparkMaster);
+ sparkConf.set("spark.executor.memory", cfg.sparkMemory);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+ HoodieDropPartitionsTool tool = new HoodieDropPartitionsTool(jsc, cfg);
+ try {
+ tool.run();
+ } catch (Throwable throwable) {
+ LOG.error("Fail to run deleting table partitions for " + cfg.toString(), throwable);
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public void run() {
+ try {
+ if (StringUtils.isNullOrEmpty(cfg.instantTime)) {
+ cfg.instantTime = HoodieActiveTimeline.createNewInstantTime();
+ }
+ LOG.info(cfg.toString());
+
+ Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
+ switch (mode) {
+ case DELETE:
+ LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete mode ****** ");
+ doDeleteTablePartitions();
+ syncToHiveIfNecessary();
+ break;
+ case DRY_RUN:
+ LOG.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode ****** ");
+ dryRun();
+ break;
+ default:
+ LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Unable to delete table partitions in " + cfg.basePath, e);
+ }
+ }
+
+ public void dryRun() {
+ try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) {
+ HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
+ List<String> parts = Arrays.asList(cfg.partitions.split(","));
+ Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(parts, parts.size()).distinct()
+ .mapToPair(partitionPath -> new Tuple2<>(partitionPath, table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList())))
+ .collectAsMap();
+ printDeleteFilesInfo(partitionToReplaceFileIds);
+ }
+ }
+
+ private void syncToHiveIfNecessary() {
+ if (cfg.syncToHive) {
+ HiveSyncConfig hiveSyncConfig = buildHiveSyncProps();
+ syncHive(hiveSyncConfig);
+ }
+ }
+
+ private void doDeleteTablePartitions() {
+
+ // need to do commit in SparkDeletePartitionCommitActionExecutor#execute
+ this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+ try (SparkRDDWriteClient<HoodieRecordPayload> client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) {
+ List<String> partitionsToDelete = Arrays.asList(cfg.partitions.split(","));
+ client.startCommitWithTime(cfg.instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
+ client.deletePartitions(partitionsToDelete, cfg.instantTime);
+ }
+ }
+
+ private HiveSyncConfig buildHiveSyncProps() {
+ verifyHiveConfigs();
+ TypedProperties props = new TypedProperties();
+ props.put(DataSourceWriteOptions.HIVE_DATABASE().key(), cfg.hiveDataBase);
+ props.put(DataSourceWriteOptions.HIVE_TABLE().key(), cfg.hiveTableName);
+ props.put(DataSourceWriteOptions.HIVE_USER().key(), cfg.hiveUserName);
+ props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
+ props.put(DataSourceWriteOptions.HIVE_URL().key(), cfg.hiveURL);
+ props.put(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), cfg.hivePartitionsField);
+ props.put(DataSourceWriteOptions.HIVE_USE_JDBC().key(), cfg.hiveUseJdbc);
+ props.put(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), cfg.hiveSyncMode);
+ props.put(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), cfg.hiveSyncIgnoreException);
+ props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
+ props.put(DataSourceWriteOptions.PARTITIONS_TO_DELETE().key(), cfg.partitions);
+ props.put(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), cfg.partitionValueExtractorClass);
+ props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), cfg.hivePartitionsField);
+
+ return DataSourceUtils.buildHiveSyncConfig(props, cfg.basePath, "PARQUET");
+ }
+
+ private void verifyHiveConfigs() {
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(cfg.hiveDataBase), "Hive database name couldn't be null or empty when enable sync meta, please set --hive-database/-db.");
+ ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(cfg.hiveTableName), "Hive table name couldn't be null or empty when enable sync meta, please set --hive-table-name/-tn.");
+ }
+
+ private void syncHive(HiveSyncConfig hiveSyncConfig) {
+ LOG.info("Syncing target hoodie table with hive table("
+ + hiveSyncConfig.tableName
+ + "). Hive metastore URL :"
+ + hiveSyncConfig.jdbcUrl
+ + ", basePath :" + cfg.basePath);
+ LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
+ FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
+ HiveConf hiveConf = new HiveConf();
+ if (!StringUtils.isNullOrEmpty(cfg.hiveHMSUris)) {
+ hiveConf.set("hive.metastore.uris", cfg.hiveHMSUris);
+ }
+ hiveConf.addResource(fs.getConf());
+ LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
+ HiveSyncTool hiveSyncTool = new HiveSyncTool(hiveSyncConfig, hiveConf, fs);
+ hiveSyncTool.syncHoodieTable();
+ }
+
+ /**
+ * Prints the delete data files info.
+ *
+ * @param partitionToReplaceFileIds
+ */
+ private void printDeleteFilesInfo(Map<String, List<String>> partitionToReplaceFileIds) {
+ LOG.info("Data files and partitions to delete : ");
+ for (Map.Entry<String, List<String>> entry : partitionToReplaceFileIds.entrySet()) {
+ LOG.info(String.format("Partitions : %s, corresponding data file IDs : %s", entry.getKey(), entry.getValue()));
+ }
+ }
+}