You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/03 14:45:26 UTC

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #5178: [HUDI-3780] improve drop partitions

xiarixiaoyao commented on a change in pull request #5178:
URL: https://github.com/apache/hudi/pull/5178#discussion_r841229331



##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -18,6 +18,26 @@
 
 package org.apache.hudi.metadata;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
 import org.apache.avro.AvroTypeException;

Review comment:
       why change import order?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -193,4 +198,100 @@ trait ProvidesHoodieConfig extends Logging {
     }
   }
 
+  def buildHoodieDropPartitionsConfig(
+                                 sparkSession: SparkSession,
+                                 hoodieCatalogTable: HoodieCatalogTable,
+                                 partitionsToDrop: String): Map[String, String] = {
+    val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+    val enableHive = isEnableHive(sparkSession)
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs
+    val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
+
+    withSparkConf(sparkSession, options) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+        OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+        PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+        PARTITIONPATH_FIELD.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+      )
+        .filter { case (_, v) => v != null }
+    }
+  }
+
+  def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
+                                   sparkSession: SparkSession): Map[String, String] = {
+    val path = hoodieCatalogTable.tableLocation
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableSchema = hoodieCatalogTable.tableSchema
+    val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
+    val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
+
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+      s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operator")
+    withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
+      Map(
+        "path" -> path,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        TBL_NAME.key -> tableConfig.getTableName,
+        HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+        URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+        KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+        SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+        OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+        PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+        HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",

Review comment:
       why 200? it will be better to make it Configurable

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1331,4 +1329,22 @@ private static boolean canCompare(Schema schema) {
     inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
     return inflightAndCompletedPartitions;
   }
+
+  /**
+   * Get Last commit's Metadata.
+   */
+  public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      if (timeline.lastInstant().isPresent()) {
+        HoodieInstant instant = timeline.lastInstant().get();
+        byte[] data = timeline.getInstantDetails(instant).get();
+        return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
+      } else {
+        return Option.empty();
+      }
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get commit metadata", e);
+    }
+  }

Review comment:
       TableSchemaResolver  has a same method, why define again

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -98,9 +103,10 @@ trait ProvidesHoodieConfig extends Logging {
     val path = hoodieCatalogTable.tableLocation
     val tableType = hoodieCatalogTable.tableTypeName
     val tableConfig = hoodieCatalogTable.tableConfig
-    val tableSchema = hoodieCatalogTable.tableSchema
 
-    val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
+    val options: Map[String, String] = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs ++ extraOptions
+    val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)

Review comment:
       it may be good to reuse code?  i see double defined

##########
File path: hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
##########
@@ -236,6 +230,11 @@ public void updatePartitionsToTable(final String tableName, final List<String> c
     throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
   }
 
+  @Override
+  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
+    throw new UnsupportedOperationException("No support for dropPartitions yet.");

Review comment:
       pls do not delete origin code annotation
   ```
   // bigQuery discovers the new partitions automatically, so do nothing.
   ```

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -193,4 +198,100 @@ trait ProvidesHoodieConfig extends Logging {
     }
   }
 
+  def buildHoodieDropPartitionsConfig(
+                                 sparkSession: SparkSession,
+                                 hoodieCatalogTable: HoodieCatalogTable,
+                                 partitionsToDrop: String): Map[String, String] = {
+    val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+    val enableHive = isEnableHive(sparkSession)
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs
+    val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
+
+    withSparkConf(sparkSession, options) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+        OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+        PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+        PARTITIONPATH_FIELD.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+      )
+        .filter { case (_, v) => v != null }
+    }
+  }
+
+  def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
+                                   sparkSession: SparkSession): Map[String, String] = {
+    val path = hoodieCatalogTable.tableLocation
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableSchema = hoodieCatalogTable.tableSchema
+    val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
+    val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
+
+    assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+      s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operator")

Review comment:
       delete operation ?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -17,21 +17,25 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
-import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.sql.InsertMode
+import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf}
 import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.types.StructType
 
+import java.util
 import scala.collection.JavaConverters.propertiesAsScalaMapConverter

Review comment:
       java import and scala import should be separated

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -193,4 +198,100 @@ trait ProvidesHoodieConfig extends Logging {
     }
   }
 
+  def buildHoodieDropPartitionsConfig(
+                                 sparkSession: SparkSession,
+                                 hoodieCatalogTable: HoodieCatalogTable,
+                                 partitionsToDrop: String): Map[String, String] = {
+    val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+    val enableHive = isEnableHive(sparkSession)
+    val catalogProperties = hoodieCatalogTable.catalogProperties
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs
+    val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
+
+    withSparkConf(sparkSession, options) {
+      Map(
+        "path" -> hoodieCatalogTable.tableLocation,
+        TBL_NAME.key -> hoodieCatalogTable.tableName,
+        TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+        OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+        PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+        RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+        PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+        PARTITIONPATH_FIELD.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+      )
+        .filter { case (_, v) => v != null }
+    }
+  }
+
+  def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
+                                   sparkSession: SparkSession): Map[String, String] = {
+    val path = hoodieCatalogTable.tableLocation
+    val tableConfig = hoodieCatalogTable.tableConfig
+    val tableSchema = hoodieCatalogTable.tableSchema
+    val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
+    val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))

Review comment:
       .toLowerCase(Locale.ROOT)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org