You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/03 14:45:26 UTC
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #5178: [HUDI-3780] improve drop partitions
xiarixiaoyao commented on a change in pull request #5178:
URL: https://github.com/apache/hudi/pull/5178#discussion_r841229331
##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -18,6 +18,26 @@
package org.apache.hudi.metadata;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.annotation.Nonnull;
import org.apache.avro.AvroTypeException;
Review comment:
why change import order?
##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -193,4 +198,100 @@ trait ProvidesHoodieConfig extends Logging {
}
}
+ def buildHoodieDropPartitionsConfig(
+ sparkSession: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ partitionsToDrop: String): Map[String, String] = {
+ val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+ val enableHive = isEnableHive(sparkSession)
+ val catalogProperties = hoodieCatalogTable.catalogProperties
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs
+ val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
+
+ withSparkConf(sparkSession, options) {
+ Map(
+ "path" -> hoodieCatalogTable.tableLocation,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+ PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+ )
+ .filter { case (_, v) => v != null }
+ }
+ }
+
+ def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
+ sparkSession: SparkSession): Map[String, String] = {
+ val path = hoodieCatalogTable.tableLocation
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val tableSchema = hoodieCatalogTable.tableSchema
+ val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
+ val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
+
+ assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+ s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operator")
+ withSparkConf(sparkSession, hoodieCatalogTable.catalogProperties) {
+ Map(
+ "path" -> path,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ TBL_NAME.key -> tableConfig.getTableName,
+ HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
+ SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
+ HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> "200",
Review comment:
why 200? it will be better to make it Configurable
##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -1331,4 +1329,22 @@ private static boolean canCompare(Schema schema) {
inflightAndCompletedPartitions.addAll(getCompletedMetadataPartitions(tableConfig));
return inflightAndCompletedPartitions;
}
+
+ /**
+ * Get Last commit's Metadata.
+ */
+ public static Option<HoodieCommitMetadata> getLatestCommitMetadata(HoodieTableMetaClient metaClient) {
+ try {
+ HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ if (timeline.lastInstant().isPresent()) {
+ HoodieInstant instant = timeline.lastInstant().get();
+ byte[] data = timeline.getInstantDetails(instant).get();
+ return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
+ } else {
+ return Option.empty();
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Failed to get commit metadata", e);
+ }
+ }
Review comment:
TableSchemaResolver has a same method, why define again
##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -98,9 +103,10 @@ trait ProvidesHoodieConfig extends Logging {
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
- val tableSchema = hoodieCatalogTable.tableSchema
- val options = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ extraOptions
+ val options: Map[String, String] = hoodieCatalogTable.catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs ++ extraOptions
+ val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
Review comment:
it may be good to reuse code? i see double defined
##########
File path: hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
##########
@@ -236,6 +230,11 @@ public void updatePartitionsToTable(final String tableName, final List<String> c
throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
}
+ @Override
+ public void dropPartitions(String tableName, List<String> partitionsToDrop) {
+ throw new UnsupportedOperationException("No support for dropPartitions yet.");
Review comment:
pls do not delete origin code annotation
```
// bigQuery discovers the new partitions automatically, so do nothing.
```
##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -193,4 +198,100 @@ trait ProvidesHoodieConfig extends Logging {
}
}
+ def buildHoodieDropPartitionsConfig(
+ sparkSession: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ partitionsToDrop: String): Map[String, String] = {
+ val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+ val enableHive = isEnableHive(sparkSession)
+ val catalogProperties = hoodieCatalogTable.catalogProperties
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs
+ val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
+
+ withSparkConf(sparkSession, options) {
+ Map(
+ "path" -> hoodieCatalogTable.tableLocation,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+ PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+ )
+ .filter { case (_, v) => v != null }
+ }
+ }
+
+ def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
+ sparkSession: SparkSession): Map[String, String] = {
+ val path = hoodieCatalogTable.tableLocation
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val tableSchema = hoodieCatalogTable.tableSchema
+ val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
+ val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
+
+ assert(hoodieCatalogTable.primaryKeys.nonEmpty,
+ s"There are no primary key defined in table ${hoodieCatalogTable.table.identifier}, cannot execute delete operator")
Review comment:
delete operation ?
##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -17,21 +17,25 @@
package org.apache.spark.sql.hudi
-import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
-import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.hive.ddl.HiveSyncMode
+import org.apache.hudi.hive.{HiveSyncConfig, MultiPartKeysValueExtractor}
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
+import org.apache.hudi.sync.common.HoodieSyncConfig
+import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
+import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isEnableHive, withSparkConf}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload}
+import org.apache.spark.sql.types.StructType
+import java.util
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
Review comment:
java import and scala import should be separated
##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
##########
@@ -193,4 +198,100 @@ trait ProvidesHoodieConfig extends Logging {
}
}
+ def buildHoodieDropPartitionsConfig(
+ sparkSession: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ partitionsToDrop: String): Map[String, String] = {
+ val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
+ val enableHive = isEnableHive(sparkSession)
+ val catalogProperties = hoodieCatalogTable.catalogProperties
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val options: Map[String, String] = catalogProperties ++ tableConfig.getProps.asScala.toMap ++ sparkSession.sqlContext.conf.getAllConfs
+ val hiveSyncConfig = buildHiveSyncConfig(options, hoodieCatalogTable)
+
+ withSparkConf(sparkSession, options) {
+ Map(
+ "path" -> hoodieCatalogTable.tableLocation,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
+ PARTITIONS_TO_DELETE.key -> partitionsToDrop,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
+ HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
+ HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
+ HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+ HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+ )
+ .filter { case (_, v) => v != null }
+ }
+ }
+
+ def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
+ sparkSession: SparkSession): Map[String, String] = {
+ val path = hoodieCatalogTable.tableLocation
+ val tableConfig = hoodieCatalogTable.tableConfig
+ val tableSchema = hoodieCatalogTable.tableSchema
+ val partitionColumns = tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
+ val partitionSchema = StructType(tableSchema.filter(f => partitionColumns.contains(f.name)))
Review comment:
.toLowerCase(Locale.ROOT)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org