You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/14 10:59:24 UTC

[GitHub] [hudi] zhangyue19921010 commented on a change in pull request #4459: [HUDI-3116]Add a new HoodieDropPartitionsTool to let users drop table partitions through a standalone job.

zhangyue19921010 commented on a change in pull request #4459:
URL: https://github.com/apache/hudi/pull/4459#discussion_r784751419



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HiveSyncTool;
+import org.apache.hudi.hive.HoodieHiveClient;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieSparkTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A tool with spark-submit to drop Hudi table partitions
+ * <p>
+ * You can dry run this tool with the following command:
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode dry_run \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * <p>
+ * You can specify the running mode of the tool through `--mode`.
+ * There are four modes of the {@link HoodieDropPartitionsTool}:
+ * - DELETE_PARTITIONS_LAZY ("delete_partitions_lazy"): This tool will mask/tombstone these partitions and corresponding data files and let cleaner delete these files later.
+ * - Also you can set --sync-hive-meta to sync current drop partition into hive
+ * <p>
+ * Example command:
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode delete_partitions_lazy \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * <p>
+ * - DELETE_PARTITIONS_EAGER ("delete_partitions_eager"): This tool will mask/tombstone these partitions and corresponding data files and and delete these data files immediately.
+ * - Also you can set --sync-hive-meta to sync current drop partition into hive
+ * <p>
+ * Example command:
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode delete_partitions_eager \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * <p>
+ * - DELETE_PARTITIONS_QUIET ("delete_partitions_quiet"): This tool will delete all the data files without create a replace instant and
+ * - Also you can set --sync-hive-meta to sync current drop partition into hive. In quiet mode,
+ *   hoodie will delete corresponding hive partitions without update the last commit time from the TBLproperties.
+ * <p>
+ * Example command:
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode delete_partitions_quiet \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * <p>
+ * - DRY_RUN ("dry_run"): look and print for the table partitions and corresponding data files which will be deleted.
+ * <p>
+ * Example command:
+ * ```
+ * spark-submit \
+ * --class org.apache.hudi.utilities.HoodieDropPartitionsTool \
+ * --packages org.apache.spark:spark-avro_2.11:2.4.4 \
+ * --master local[*]
+ * --driver-memory 1g \
+ * --executor-memory 1g \
+ * $HUDI_DIR/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.11.0-SNAPSHOT.jar \
+ * --base-path basePath \
+ * --table-name tableName \
+ * --mode dry_run \
+ * --partitions partition1,partition2
+ * ```
+ *
+ * Also you can use --help to find more configs to use.
+ */
+public class HoodieDropPartitionsTool implements Serializable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieDropPartitionsTool.class);
+  // Spark context
+  private final transient JavaSparkContext jsc;
+  // config
+  private final Config cfg;
+  // Properties with source, hoodie client, key generator etc.
+  private TypedProperties props;
+
+  private final HoodieTableMetaClient metaClient;
+
+  public HoodieDropPartitionsTool(JavaSparkContext jsc, Config cfg) {
+    this.jsc = jsc;
+    this.cfg = cfg;
+
+    this.props = cfg.propsFilePath == null
+        ? UtilHelpers.buildProperties(cfg.configs)
+        : readConfigFromFileSystem(jsc, cfg);
+    this.metaClient = HoodieTableMetaClient.builder()
+        .setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath)
+        .setLoadActiveTimelineOnLoad(true)
+        .build();
+  }
+
+  /**
+   * Reads config from the file system.
+   *
+   * @param jsc {@link JavaSparkContext} instance.
+   * @param cfg {@link Config} instance.
+   * @return the {@link TypedProperties} instance.
+   */
+  private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) {
+    return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs)
+        .getProps(true);
+  }
+
+  public enum Mode {
+    // Mask/Tombstone these partitions and corresponding data files and let cleaner delete these files later.
+    DELETE_PARTITIONS_LAZY,
+    // Mask/Tombstone these partitions and corresponding data files. And delete these data files immediately.
+    DELETE_PARTITIONS_EAGER,
+    // Delete all the data files without create a replace instant, Also when set --sync-hive-meta to sync current drop partition into hive in quiet mode,
+    // hoodie will delete corresponding hive partitions without update the last commit time from the TBLproperties.
+    DELETE_PARTITIONS_QUIET,
+    // Dry run by looking for the table partitions and corresponding data files which will be deleted.
+    DRY_RUN
+  }
+
+  public static class Config implements Serializable {
+    @Parameter(names = {"--base-path", "-sp"}, description = "Base path for the table", required = true)
+    public String basePath = null;
+    @Parameter(names = {"--mode", "-m"}, description = "Set job mode: "
+        + "Set \"delete_partitions_lazy\" means mask/tombstone these partitions and corresponding data files table partitions and let cleaner delete these files later;"
+        + "Set \"delete_partitions_eager\" means delete data files and corresponding directory directly through file system but don't change hoodie meta files;"
+        + "Set \"dry_run\" means only looking for the table partitions will be deleted and corresponding data files.", required = true)
+    public String runningMode = null;
+    @Parameter(names = {"--table-name", "-tn"}, description = "Table name", required = true)
+    public String tableName = null;
+    @Parameter(names = {"--partitions", "-p"}, description = "Comma separated list of partitions to delete.", required = true)
+    public String partitions = null;
+    @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism for hoodie insert/upsert/delete", required = false)
+    public int parallelism = 1500;
+    @Parameter(names = {"--instant-time", "-it"}, description = "instant time for delete table partitions operation.", required = false)
+    public String instantTime = null;
+    @Parameter(names = {"--clean-up-empty-directory"}, description = "Delete all the empty data directory.", required = false)
+    public boolean cleanUpEmptyDirectory = false;
+    @Parameter(names = {"--sync-hive-meta", "-sync"}, description = "Sync information to HMS.", required = false)
+    public boolean syncToHive = false;
+    @Parameter(names = {"--hive-database", "-db"}, description = "Database to sync to.", required = false)
+    public String hiveDataBase = null;
+    @Parameter(names = {"--hive-table-name"}, description = "Table to sync to.", required = false)
+    public String hiveTableName = null;
+    @Parameter(names = {"--hive-user-name", "-user"}, description = "hive user name to use.", required = false)
+    public String hiveUserName = "hive";
+    @Parameter(names = {"--hive-pass-word", "-pass"}, description = "hive password to use.", required = false)
+    public String hivePassWord = "hive";
+    @Parameter(names = {"--hive-jdbc-url", "-jdbc"}, description = "hive url to use.", required = false)
+    public String hiveURL = "jdbc:hive2://localhost:10000";
+    @Parameter(names = {"--hive-partition-field"}, description = "Comma separated list of field in the hive table to use for determining hive partition columns.", required = false)
+    public String hivePartitionsField = "";
+    @Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization.", required = false)
+    public boolean hiveUseJdbc = true;
+    @Parameter(names = {"--hive-metastore-uris"}, description = "hive meta store uris to use.", required = false)
+    public String hiveHMSUris = null;
+    @Parameter(names = {"--hive-sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.", required = false)
+    public String hiveSyncMode = "hms";
+    @Parameter(names = {"--hive-sync-ignore-exception"}, description = "Ignore hive sync exception.", required = false)
+    public boolean hiveSyncIgnoreException = false;
+    @Parameter(names = {"--hive-partition-value-extractor-class"}, description = "Class which implements PartitionValueExtractor to extract the partition values,"
+        + " default 'SlashEncodedDayPartitionValueExtractor'.", required = false)
+    public String partitionValueExtractorClass = "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor";
+    @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false)
+    public String sparkMaster = null;
+    @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false)
+    public String sparkMemory = "1g";
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for deleting table partitions")
+    public String propsFilePath = null;
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated",
+        splitter = IdentitySplitter.class)
+    public List<String> configs = new ArrayList<>();
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+  }
+
+  private String getConfigDetails() {
+    final StringBuilder sb = new StringBuilder("HoodieDropPartitionsToolConfig {\n");
+    sb.append("   --base-path ").append(cfg.basePath).append(", \n");
+    sb.append("   --mode ").append(cfg.runningMode).append(", \n");
+    sb.append("   --table-name ").append(cfg.tableName).append(", \n");
+    sb.append("   --partitions ").append(cfg.partitions).append(", \n");
+    sb.append("   --parallelism ").append(cfg.parallelism).append(", \n");
+    sb.append("   --instantTime ").append(cfg.instantTime).append(", \n");
+    sb.append("   --clean-up-empty-directory ").append(cfg.cleanUpEmptyDirectory).append(", \n");
+    sb.append("   --sync-hive-meta ").append(cfg.syncToHive).append(", \n");
+    sb.append("   --hive-database ").append(cfg.hiveDataBase).append(", \n");
+    sb.append("   --hive-table-name ").append(cfg.hiveTableName).append(", \n");
+    sb.append("   --hive-user-name ").append("Masked").append(", \n");
+    sb.append("   --hive-pass-word ").append("Masked").append(", \n");
+    sb.append("   --hive-jdbc-url ").append(cfg.hiveURL).append(", \n");
+    sb.append("   --hive-partition-field ").append(cfg.hivePartitionsField).append(", \n");
+    sb.append("   --hive-sync-use-jdbc ").append(cfg.hiveUseJdbc).append(", \n");
+    sb.append("   --hive-metastore-uris ").append(cfg.hiveHMSUris).append(", \n");
+    sb.append("   --hive-sync-mode ").append(cfg.hiveSyncMode).append(", \n");
+    sb.append("   --hive-sync-ignore-exception ").append(cfg.hiveSyncIgnoreException).append(", \n");
+    sb.append("   --hive-partition-value-extractor-class ").append(cfg.partitionValueExtractorClass).append(", \n");
+    sb.append("   --spark-master ").append(cfg.sparkMaster).append(", \n");
+    sb.append("   --spark-memory ").append(cfg.sparkMemory).append(", \n");
+    sb.append("   --props ").append(cfg.propsFilePath).append(", \n");
+    sb.append("   --hoodie-conf ").append(cfg.configs);
+    sb.append("\n}");
+    return sb.toString();
+  }
+
+  public static void main(String[] args) {
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    SparkConf sparkConf = UtilHelpers.buildSparkConf("Hoodie-Drop-Table-Partitions", cfg.sparkMaster);
+    sparkConf.set("spark.executor.memory", cfg.sparkMemory);
+    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+    HoodieDropPartitionsTool tool = new HoodieDropPartitionsTool(jsc, cfg);
+    try {
+      tool.run();
+    } catch (Throwable throwable) {
+      LOG.error("Fail to run deleting table partitions for " + tool.getConfigDetails(), throwable);
+    } finally {
+      jsc.stop();
+    }
+  }
+
+  public void run() {
+    try {
+      if (StringUtils.isNullOrEmpty(cfg.instantTime)) {
+        cfg.instantTime = HoodieActiveTimeline.createNewInstantTime();
+      }
+      LOG.info(getConfigDetails());
+
+      Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
+      switch (mode) {
+        case DELETE_PARTITIONS_LAZY:
+          LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete_partition_lazy mode ******");
+          doDeleteTablePartitionsLazy();
+          break;
+        case DELETE_PARTITIONS_EAGER:
+          LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete_partition_eager mode ******");
+          doDeleteTablePartitionsEager();
+          break;
+        case DELETE_PARTITIONS_QUIET:
+          LOG.info(" ****** The Hoodie Drop Partitions Tool is in delete_partition_quiet mode ******");
+          doDeleteTablePartitionsQuiet();
+          break;
+        case DRY_RUN:
+          LOG.info(" ****** The Hoodie Drop Partitions Tool is in dry-run mode ******");
+          dryRun();
+          break;
+        default:
+          LOG.info("Unsupported running mode [" + cfg.runningMode + "], quit the job directly");
+      }
+    } catch (Exception e) {
+      throw new HoodieException("Unable to delete table partitions in " + cfg.basePath, e);
+    }
+  }
+
+  public void doDeleteTablePartitionsLazy() {
+    doDeleteTablePartitions();
+    syncToHiveIfNecessary(false);
+  }
+
+  public void doDeleteTablePartitionsEager() {
+    doDeleteTablePartitions();
+    deleteDataFiles();

Review comment:
       Make Sense! Just thinking, how about designing doDeleteTablePartitionsEager like
   1. doDeleteTablePartitions which will call client.deletePartitions 
           a. create a replace commit.
           b. sync metadata table.
           c. schedule and execute an inline clean action immediately.
   3. delete empty dir.
   4. sync to hive.
   
   The advantage of doing this is that 
   1. we do anything in the box via HoodieWrliteClient
   2. take care of metadata table syncing.
   3. no need to change current writeClient apis.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org