You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/05 06:37:05 UTC

[hudi] 01/01: Revert "[HUDI-3116]Add a new HoodieDropPartitionsTool to let users drop table partitions through a standalone job. (#4459)"

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch revert-4459-hoodieDropPartitionsTool
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e0ec86a82e8735e72173e7f4a61bafa20f59948e
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Tue Jul 5 14:36:59 2022 +0800

    Revert "[HUDI-3116]Add a new HoodieDropPartitionsTool to let users drop table partitions through a standalone job. (#4459)"
    
    This reverts commit 45fdcf68a1ce7e780f472b4af67edb4946a4fbc3.
---
 .../hudi/utilities/HoodieDropPartitionsTool.java   | 396 ---------------------
 1 file changed, 396 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
deleted file mode 100644
index 50fdf36c81..0000000000
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hudi.DataSourceUtils;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hive.HiveSyncConfig;
-import org.apache.hudi.hive.HiveSyncTool;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-import org.apache.hudi.table.HoodieSparkTable;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaSparkContext;
-
-import scala.Tuple2;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * A tool with spark-submit to drop Hudi table partitions.
- *
- * <p>
- * You can dry run this tool with the following command to look and print for the table partitions and corresponding data files which will be deleted.
- * ```
- * spark-submit \
- * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
- * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
- * --master local[*]
- * --driver-memory 1g \
- * --executor-memory 1g \
- * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
- * --base-path basePath \
- * --table-name tableName \
- * --mode dry_run \
- * --partitions partition1,partition2
- * ```
- *
- * <p>
- *
- * You can delete the table partitions with '--mode delete'
- *
- * - DELETE ("delete"): This tool will mask/tombstone these partitions and corresponding data files and let cleaner delete these files later.
- * - Also you can set --sync-hive-meta to sync current drop partition into hive
- * <p>
- * Example command:
- * ```
- * spark-submit \
- * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
- * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
- * --master local[*]
- * --driver-memory 1g \
- * --executor-memory 1g \
- * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
- * --base-path basePath \
- * --table-name tableName \
- * --mode delete \
- * --partitions partition1,partition2
- * ```
- *
- * Also you can use --help to find more configs to use.
- */
-public class HoodieDropPartitionsTool implements Serializable {
-
-  private static final Logger LOG = LogManager.getLogger(HoodieDropPartitionsTool.class);
-  // Spark context
-  private final transient JavaSparkContext jsc;
-  // config
-  private final Config cfg;
-  // Properties with source, hoodie client, key generator etc.
-  private TypedProperties props;
-
-  private final HoodieTableMetaClient metaClient;
-
-  public HoodieDropPartitionsTool(JavaSparkContext jsc, Config cfg) {
-    this.jsc = jsc;
-    this.cfg = cfg;
-
-    this.props = cfg.propsFilePath == null
-        ? UtilHelpers.buildProperties(cfg.configs)
-        : readConfigFromFileSystem(jsc, cfg);
-    this.metaClient = HoodieTableMetaClient.builder()
-        .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath)
-        .setLoadActiveTimelineOnLoad(true)
-        .build();
-  }
-
-  /**
-   * Reads config from the file system.
-   *
-   * @param jsc {@link JavaSparkContext} instance.
-   * @param cfg {@link Config} instance.
-   * @return the {@link TypedProperties} instance.
-   */
-  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
-    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
-        .getProps(true);
-  }
-
-  public enum Mode {
-    // Mask/Tombstone these partitions and corresponding data files and let cleaner delete these files later.
-    DELETE,
-    // Dry run by looking for the table partitions and corresponding data files which will be deleted.
-    DRY_RUN
-  }
-
-  public static class Config implements Serializable {
-    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
-    public String basePath = null;
-    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: "
-        + "Set \"delete\" means mask/tombstone these partitions and corresponding data files table partitions and let cleaner delete these files later;"
-        + "Set \"dry_run\" means only looking for the table partitions will be deleted and corresponding data files.", required = true)
-    public String runningMode = null;
-    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
-    public String tableName = null;
-    @Parameter(names = {"--partitions", "-p"}, description = "Comma separated list of partitions to delete.", required = true)
-    public String partitions = null;
-    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert/upsert/delete", required = false)
-    public int parallelism = 1500;
-    @Parameter(names = {"--instant-time", "-it"}, description = "instant time for delete table partitions operation.", required = false)
-    public String instantTime = null;
-    @Parameter(names = {"--sync-hive-meta", "-sync"}, description = "Sync information to HMS.", required = false)
-    public boolean syncToHive = false;
-    @Parameter(names = {"--hive-database", "-db"}, description = "Database to sync to.", required = false)
-    public String hiveDataBase = null;
-    @Parameter(names = {"--hive-table-name"}, description = "Table to sync to.", required = false)
-    public String hiveTableName = null;
-    @Parameter(names = {"--hive-user-name", "-user"}, description = "hive user name to use.", required = false)
-    public String hiveUserName = "hive";
-    @Parameter(names = {"--hive-pass-word", "-pass"}, description = "hive password to use.", required = false)
-    public String hivePassWord = "hive";
-    @Parameter(names = {"--hive-jdbc-url", "-jdbc"}, description = "hive url to use.", required = false)
-    public String hiveURL = "jdbc:hive2://localhost:10000";
-    @Parameter(names = {"--hive-partition-field"}, description = "Comma separated list of field in the hive table to use for determining hive partition columns.", required = false)
-    public String hivePartitionsField = "";
-    @Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization.", required = false)
-    public boolean hiveUseJdbc = true;
-    @Parameter(names = {"--hive-metastore-uris"}, description = "hive meta store uris to use.", required = false)
-    public String hiveHMSUris = null;
-    @Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.", required = false)
-    public String hiveSyncMode = "hms";
-    @Parameter(names = {"--hive-sync-ignore-exception"}, description = "Ignore hive sync exception.", required = false)
-    public boolean hiveSyncIgnoreException = false;
-    @Parameter(names = {"--hive-partition-value-extractor-class"}, description = "Class which implements PartitionValueExtractor to extract the partition values,"
-        + " default 'SlashEncodedDayPartitionValueExtractor'.", required = false)
-    public String partitionValueExtractorClass = "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor";
-    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
-    public String sparkMaster = null;
-    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
-    public String sparkMemory = "1g";
-    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
-        + "hoodie client for deleting table partitions")
-    public String propsFilePath = null;
-    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
-        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
-        splitter = IdentitySplitter.class)
-    public List<String> configs = new ArrayList<>();
-    @Parameter(names = {"--help", "-h"}, help = true)
-    public Boolean help = false;
-
-    @Override
-    public String toString() {
-      return "HoodieDropPartitionsToolConfig {\n"
-          + "   --base-path " + basePath + ", \n"
-          + "   --mode " + runningMode + ", \n"
-          + "   --table-name " + tableName + ", \n"
-          + "   --partitions " + partitions + ", \n"
-          + "   --parallelism " + parallelism + ", \n"
-          + "   --instantTime " + instantTime + ", \n"
-          + "   --sync-hive-meta " + syncToHive + ", \n"
-          + "   --hive-database " + hiveDataBase + ", \n"
-          + "   --hive-table-name " + hiveTableName + ", \n"
-          + "   --hive-user-name " + "Masked" + ", \n"
-          + "   --hive-pass-word " + "Masked" + ", \n"
-          + "   --hive-jdbc-url " + hiveURL + ", \n"
-          + "   --hive-partition-field " + hivePartitionsField + ", \n"
-          + "   --hive-sync-use-jdbc " + hiveUseJdbc + ", \n"
-          + "   --hive-metastore-uris " + hiveHMSUris + ", \n"
-          + "   --hive-sync-ignore-exception " + hiveSyncIgnoreException + ", \n"
-          + "   --hive-partition-value-extractor-class " + partitionValueExtractorClass + ", \n"
-          + "   --spark-master " + sparkMaster + ", \n"
-          + "   --spark-memory " + sparkMemory + ", \n"
-          + "   --props " + propsFilePath + ", \n"
-          + "   --hoodie-conf " + configs
-          + "\n}";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      Config config = (Config) o;
-      return basePath.equals(config.basePath)
-          && Objects.equals(runningMode, config.runningMode)
-          && Objects.equals(tableName, config.tableName)
-          && Objects.equals(partitions, config.partitions)
-          && Objects.equals(instantTime, config.instantTime)
-          && Objects.equals(syncToHive, config.syncToHive)
-          && Objects.equals(hiveDataBase, config.hiveDataBase)
-          && Objects.equals(hiveTableName, config.hiveTableName)
-          && Objects.equals(hiveUserName, config.hiveUserName)
-          && Objects.equals(hivePassWord, config.hivePassWord)
-          && Objects.equals(hiveURL, config.hiveURL)
-          && Objects.equals(hivePartitionsField, config.hivePartitionsField)
-          && Objects.equals(hiveUseJdbc, config.hiveUseJdbc)
-          && Objects.equals(hiveHMSUris, config.hiveHMSUris)
-          && Objects.equals(partitionValueExtractorClass, config.partitionValueExtractorClass)
-          && Objects.equals(sparkMaster, config.sparkMaster)
-          && Objects.equals(sparkMemory, config.sparkMemory)
-          && Objects.equals(propsFilePath, config.propsFilePath)
-          && Objects.equals(configs, config.configs)
-          && Objects.equals(hiveSyncIgnoreException, config.hiveSyncIgnoreException);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(basePath, runningMode, tableName, partitions, instantTime,
-          syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord, hiveURL,
-          hivePartitionsField, hiveUseJdbc, hiveHMSUris, partitionValueExtractorClass,
-          sparkMaster, sparkMemory, propsFilePath, configs, hiveSyncIgnoreException, help);
-    }
-  }
-
-  public static void main(String[] args) {
-    final Config cfg = new Config();
-    JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
-      cmd.usage();
-      System.exit(1);
-    }
-    SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Drop-Table-Partitions", cfg.sparkMaster);
-    sparkConf.set("spark.executor.memory", cfg.sparkMemory);
-    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-    HoodieDropPartitionsTool tool = new HoodieDropPartitionsTool(jsc, cfg);
-    try {
-      tool.run();
-    } catch (Throwable throwable) {
-      LOG.error("Fail to run deleting table partitions for " + cfg.toString(), throwable);
-    } finally {
-      jsc.stop();
-    }
-  }
-
-  public void run() {
-    try {
-      if (StringUtils.isNullOrEmpty(cfg.instantTime)) {
-        cfg.instantTime = HoodieActiveTimeline.createNewInstantTime();
-      }
-      LOG.info(cfg.toString());
-
-      Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
-      switch (mode) {
-        case DELETE:
-          LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete mode ****** ");
-          doDeleteTablePartitions();
-          syncToHiveIfNecessary();
-          break;
-        case DRY_RUN:
-          LOG.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode ****** ");
-          dryRun();
-          break;
-        default:
-          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
-      }
-    } catch (Exception e) {
-      throw new HoodieException("Unable to delete table partitions in " + cfg.basePath, e);
-    }
-  }
-
-  public void dryRun() {
-    try (SparkRDDWriteClient<HoodieRecordPayload> client =  UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) {
-      HoodieSparkTable<HoodieRecordPayload> table = HoodieSparkTable.create(client.getConfig(), client.getEngineContext());
-      List<String> parts = Arrays.asList(cfg.partitions.split(","));
-      Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(parts, parts.size()).distinct()
-          .mapToPair(partitionPath -> new Tuple2<>(partitionPath, table.getSliceView().getLatestFileSlices(partitionPath).map(fg -> fg.getFileId()).distinct().collect(Collectors.toList())))
-          .collectAsMap();
-      printDeleteFilesInfo(partitionToReplaceFileIds);
-    }
-  }
-
-  private void syncToHiveIfNecessary() {
-    if (cfg.syncToHive) {
-      HiveSyncConfig hiveSyncConfig = buildHiveSyncProps();
-      syncHive(hiveSyncConfig);
-    }
-  }
-
-  private void doDeleteTablePartitions() {
-
-    // need to do commit in SparkDeletePartitionCommitActionExecutor#execute
-    this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
-    try (SparkRDDWriteClient<HoodieRecordPayload> client =  UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, Option.empty(), props)) {
-      List<String> partitionsToDelete = Arrays.asList(cfg.partitions.split(","));
-      client.startCommitWithTime(cfg.instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
-      client.deletePartitions(partitionsToDelete, cfg.instantTime);
-    }
-  }
-
-  private HiveSyncConfig buildHiveSyncProps() {
-    verifyHiveConfigs();
-    TypedProperties props = new TypedProperties();
-    props.put(DataSourceWriteOptions.HIVE_DATABASE().key(), cfg.hiveDataBase);
-    props.put(DataSourceWriteOptions.HIVE_TABLE().key(), cfg.hiveTableName);
-    props.put(DataSourceWriteOptions.HIVE_USER().key(), cfg.hiveUserName);
-    props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
-    props.put(DataSourceWriteOptions.HIVE_URL().key(), cfg.hiveURL);
-    props.put(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), cfg.hivePartitionsField);
-    props.put(DataSourceWriteOptions.HIVE_USE_JDBC().key(), cfg.hiveUseJdbc);
-    props.put(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), cfg.hiveSyncMode);
-    props.put(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), cfg.hiveSyncIgnoreException);
-    props.put(DataSourceWriteOptions.HIVE_PASS().key(), cfg.hivePassWord);
-    props.put(DataSourceWriteOptions.PARTITIONS_TO_DELETE().key(), cfg.partitions);
-    props.put(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), cfg.partitionValueExtractorClass);
-    props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), cfg.hivePartitionsField);
-
-    return DataSourceUtils.buildHiveSyncConfig(props, cfg.basePath, "PARQUET");
-  }
-
-  private void verifyHiveConfigs() {
-    ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(cfg.hiveDataBase), "Hive database name couldn't be null or empty when enable sync meta, please set --hive-database/-db.");
-    ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(cfg.hiveTableName), "Hive table name couldn't be null or empty when enable sync meta, please set --hive-table-name/-tn.");
-  }
-
-  private void syncHive(HiveSyncConfig hiveSyncConfig) {
-    LOG.info("Syncing target hoodie table with hive table("
-        + hiveSyncConfig.tableName
-        + "). Hive metastore URL :"
-        + hiveSyncConfig.jdbcUrl
-        + ", basePath :" + cfg.basePath);
-    LOG.info("Hive Sync Conf => " + hiveSyncConfig.toString());
-    FileSystem fs = FSUtils.getFs(cfg.basePath, jsc.hadoopConfiguration());
-    HiveConf hiveConf = new HiveConf();
-    if (!StringUtils.isNullOrEmpty(cfg.hiveHMSUris)) {
-      hiveConf.set("hive.metastore.uris", cfg.hiveHMSUris);
-    }
-    hiveConf.addResource(fs.getConf());
-    LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
-    HiveSyncTool hiveSyncTool = new HiveSyncTool(hiveSyncConfig, hiveConf, fs);
-    hiveSyncTool.syncHoodieTable();
-  }
-
-  /**
-   * Prints the delete data files info.
-   *
-   * @param partitionToReplaceFileIds
-   */
-  private void printDeleteFilesInfo(Map<String, List<String>> partitionToReplaceFileIds) {
-    LOG.info("Data files and partitions to delete : ");
-    for (Map.Entry<String, List<String>> entry  : partitionToReplaceFileIds.entrySet()) {
-      LOG.info(String.format("Partitions : %s, corresponding data file IDs : %s", entry.getKey(), entry.getValue()));
-    }
-  }
-}