You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by bi...@apache.org on 2022/10/26 02:28:31 UTC
[hudi] branch master updated: [HUDI-5057] Fix msck repair hudi table (#6999)
This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ca4003b784 [HUDI-5057] Fix msck repair hudi table (#6999)
ca4003b784 is described below
commit ca4003b78445d5be8b305b6f5ead90a76bfe361b
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Wed Oct 26 10:28:26 2022 +0800
[HUDI-5057] Fix msck repair hudi table (#6999)
---
.../spark/sql/HoodieCatalystPlansUtils.scala | 10 ++
.../java/org/apache/hudi/common/fs/FSUtils.java | 5 +-
.../java/org/apache/hudi/source/FileIndex.java | 2 +-
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 16 +-
.../hudi/command/RepairHoodieTableCommand.scala | 170 +++++++++++++++++++++
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 8 +
.../apache/spark/sql/hudi/TestRepairTable.scala | 163 ++++++++++++++++++++
.../spark/sql/HoodieSpark2CatalystPlanUtils.scala | 16 +-
.../spark/sql/HoodieSpark31CatalystPlanUtils.scala | 16 ++
.../spark/sql/HoodieSpark32CatalystPlanUtils.scala | 16 ++
.../spark/sql/HoodieSpark33CatalystPlanUtils.scala | 15 ++
11 files changed, 431 insertions(+), 6 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index e7e529b125..efd0eacac7 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -90,4 +90,14 @@ trait HoodieCatalystPlansUtils {
def createInsertInto(table: LogicalPlan, partition: Map[String, Option[String]],
query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean): LogicalPlan
+ /**
+ * Test if the logical plan is a Repair Table LogicalPlan.
+ */
+ def isRepairTable(plan: LogicalPlan): Boolean
+
+ /**
+ * Get the member of the Repair Table LogicalPlan.
+ */
+ def getRepairTableChildren(plan: LogicalPlan):
+ Option[(TableIdentifier, Boolean, Boolean, String)]
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index c3d8e9f8be..eb5fbe9179 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -322,10 +322,9 @@ public class FSUtils {
public static Map<String, FileStatus[]> getFilesInPartitions(HoodieEngineContext engineContext,
HoodieMetadataConfig metadataConfig,
String basePathStr,
- String[] partitionPaths,
- String spillableMapPath) {
+ String[] partitionPaths) {
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
- spillableMapPath, true)) {
+ FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true)) {
return tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths));
} catch (Exception ex) {
throw new HoodieException("Error get files in partitions: " + String.join(",", partitionPaths), ex);
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 92396c1820..a382954fd2 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -138,7 +138,7 @@ public class FileIndex {
}
String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new);
FileStatus[] allFileStatus = FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(),
- partitions, "/tmp/")
+ partitions)
.values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
Set<String> candidateFiles = candidateFilesInMetadataTable(allFileStatus);
if (candidateFiles == null) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 025a224373..aff65672c5 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.hudi
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
@@ -78,6 +78,20 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, getTableLocation(table, spark)).asScala
}
+ def getFilesInPartitions(spark: SparkSession,
+ table: CatalogTable,
+ partitionPaths: Seq[String]): Map[String, Array[FileStatus]] = {
+ val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))
+ val metadataConfig = {
+ val properties = new Properties()
+ properties.putAll((spark.sessionState.conf.getAllConfs ++ table.storage.properties ++
+ table.properties).asJava)
+ HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
+ }
+ FSUtils.getFilesInPartitions(sparkEngine, metadataConfig, getTableLocation(table, spark),
+ partitionPaths.toArray).asScala.toMap
+ }
+
/**
* This method is used to compatible with the old non-hive-styled partition table.
* By default we enable the "hoodie.datasource.write.hive_style_partitioning"
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala
new file mode 100644
index 0000000000..cc25318c6a
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.hudi.common.table.HoodieTableConfig
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.command.PartitionStatistics
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.util.ThreadUtils
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import scala.util.control.NonFatal
+
+/**
+ * Command for repair hudi table's partitions.
+ * Use the methods in HoodieSqlCommonUtils to obtain partitions and stats
+ * instead of scanning the file system.
+ */
+case class RepairHoodieTableCommand(tableName: TableIdentifier,
+ enableAddPartitions: Boolean,
+ enableDropPartitions: Boolean,
+ cmd: String = "MSCK REPAIR TABLE") extends HoodieLeafRunnableCommand {
+
+ // These are list of statistics that can be collected quickly without requiring a scan of the data
+ // see https://github.com/apache/hive/blob/master/
+ // common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
+ val NUM_FILES = "numFiles"
+ val TOTAL_SIZE = "totalSize"
+ val DDL_TIME = "transient_lastDdlTime"
+
+ override def run(spark: SparkSession): Seq[Row] = {
+ val catalog = spark.sessionState.catalog
+ val table = catalog.getTableMetadata(tableName)
+ val tableIdentWithDB = table.identifier.quotedString
+ if (table.partitionColumnNames.isEmpty) {
+ throw new AnalysisException(
+ s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
+ }
+
+ if (table.storage.locationUri.isEmpty) {
+ throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
+ s"location provided: $tableIdentWithDB")
+ }
+
+ val root = new Path(table.location)
+ logInfo(s"Recover all the partitions in $root")
+
+ val hoodieCatalogTable = HoodieCatalogTable(spark, table.identifier)
+ val isHiveStyledPartitioning = hoodieCatalogTable.catalogProperties.
+ getOrElse(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key, "true").toBoolean
+
+ val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = hoodieCatalogTable.
+ getPartitionPaths.map(partitionPath => {
+ var values = partitionPath.split('/')
+ if (isHiveStyledPartitioning) {
+ values = values.map(_.split('=')(1))
+ }
+ (table.partitionColumnNames.zip(values).toMap, new Path(root, partitionPath))
+ })
+
+ val droppedAmount = if (enableDropPartitions) {
+ dropPartitions(catalog, partitionSpecsAndLocs)
+ } else 0
+ val addedAmount = if (enableAddPartitions) {
+ val total = partitionSpecsAndLocs.length
+ val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
+ HoodieSqlCommonUtils.getFilesInPartitions(spark, table, partitionSpecsAndLocs
+ .map(_._2.toString))
+ .mapValues(statuses => PartitionStatistics(statuses.length, statuses.map(_.getLen).sum))
+ } else {
+ Map.empty[String, PartitionStatistics]
+ }
+ logInfo(s"Finished to gather the fast stats for all $total partitions.")
+ addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+ total
+ } else 0
+ // Updates the table to indicate that its partition metadata is stored in the Hive metastore.
+ // This is always the case for Hive format tables, but is not true for Datasource tables created
+ // before Spark 2.1 unless they are converted via `msck repair table`.
+ spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
+ try {
+ spark.catalog.refreshTable(tableIdentWithDB)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of the table " +
+ "might return wrong result if the table was cached. To avoid such issue, you should " +
+ "uncache the table manually via the UNCACHE TABLE command after table recovering will " +
+ "complete fully.", e)
+ }
+ logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).")
+ Seq.empty[Row]
+ }
+
+ private def addPartitions(spark: SparkSession,
+ table: CatalogTable,
+ partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)],
+ partitionStats: Map[String, PartitionStatistics]): Unit = {
+ val total = partitionSpecsAndLocs.length
+ var done = 0L
+ // Hive metastore may not have enough memory to handle millions of partitions in single RPC,
+ // we should split them into smaller batches. Since Hive client is not thread safe, we cannot
+ // do this in parallel.
+ val batchSize = spark.sparkContext.conf.getInt("spark.sql.addPartitionInBatch.size", 100)
+ partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
+ val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
+ val parts = batch.map { case (spec, location) =>
+ val params = partitionStats.get(location.toString).map {
+ case PartitionStatistics(numFiles, totalSize) =>
+ // This two fast stat could prevent Hive metastore to list the files again.
+ Map(NUM_FILES -> numFiles.toString,
+ TOTAL_SIZE -> totalSize.toString,
+ // Workaround a bug in HiveMetastore that try to mutate a read-only parameters.
+ // see metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+ DDL_TIME -> now.toString)
+ }.getOrElse(Map.empty)
+ // inherit table storage format (possibly except for location)
+ CatalogTablePartition(
+ spec,
+ table.storage.copy(locationUri = Some(location.toUri)),
+ params)
+ }
+ spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
+ done += parts.length
+ logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
+ }
+ }
+
+ // Drops the partitions that do not exist in partitionSpecsAndLocs
+ private def dropPartitions(catalog: SessionCatalog,
+ partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)]): Int = {
+ val dropPartSpecs = ThreadUtils.parmap(
+ catalog.listPartitions(tableName),
+ "RepairTableCommand: non-existing partitions",
+ maxThreads = 8) { partition =>
+ partition.storage.locationUri.flatMap { uri =>
+ if (partitionSpecsAndLocs.map(_._2).contains(new Path(uri))) None else Some(partition.spec)
+ }
+ }.flatten
+ catalog.dropPartitions(
+ tableName,
+ dropPartSpecs,
+ ignoreIfNotExists = true,
+ purge = false,
+ // Since we have already checked that partition directories do not exist, we can avoid
+ // additional calls to the file system at the catalog side by setting this flag.
+ retainData = true)
+ dropPartSpecs.length
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c5688965d7..5962ac867c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -610,6 +610,14 @@ case class HoodiePostAnalysisRule(sparkSession: SparkSession) extends Rule[Logic
case TruncateTableCommand(tableName, partitionSpec)
if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
TruncateHoodieTableCommand(tableName, partitionSpec)
+ // Rewrite RepairTableCommand to RepairHoodieTableCommand
+ case r if sparkAdapter.getCatalystPlanUtils.isRepairTable(r) =>
+ val (tableName, enableAddPartitions, enableDropPartitions, cmd) = sparkAdapter.getCatalystPlanUtils.getRepairTableChildren(r).get
+ if (sparkAdapter.isHoodieTable(tableName, sparkSession)) {
+ RepairHoodieTableCommand(tableName, enableAddPartitions, enableDropPartitions, cmd)
+ } else {
+ r
+ }
case _ => plan
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
new file mode 100644
index 0000000000..498121e1ab
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+
+import org.apache.spark.sql.SaveMode
+
+class TestRepairTable extends HoodieSparkSqlTestBase {
+
+ test("Test msck repair non-partitioned table") {
+ Seq("true", "false").foreach { hiveStylePartitionEnable =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | dt string,
+ | hh string
+ | ) using hudi
+ | location '$basePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = '$hiveStylePartitionEnable'
+ | )
+ """.stripMargin)
+
+ checkExceptionContain(s"msck repair table $tableName")(
+ s"Operation not allowed")
+ }
+ }
+ }
+
+ test("Test msck repair partitioned table") {
+ Seq("true", "false").foreach { hiveStylePartitionEnable =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | dt string,
+ | hh string
+ | ) using hudi
+ | partitioned by (dt, hh)
+ | location '$basePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = '$hiveStylePartitionEnable'
+ | )
+ """.stripMargin)
+ val table = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+
+ import spark.implicits._
+ val df = Seq((1, "a1", 1000, "2022-10-06", "11"), (2, "a2", 1001, "2022-10-06", "12"))
+ .toDF("id", "name", "ts", "dt", "hh")
+ df.write.format("hudi")
+ .option(RECORDKEY_FIELD.key, "id")
+ .option(PRECOMBINE_FIELD.key, "ts")
+ .option(PARTITIONPATH_FIELD.key, "dt, hh")
+ .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table))
+ spark.sql(s"msck repair table $tableName")
+ assertResult(Seq("dt=2022-10-06/hh=11", "dt=2022-10-06/hh=12"))(
+ spark.sessionState.catalog.listPartitionNames(table))
+ }
+ }
+ }
+
+ test("Test msck repair partitioned table [add/drop/sync] partitions") {
+ if (HoodieSparkUtils.gteqSpark3_2) {
+ Seq("true", "false").foreach { hiveStylePartitionEnable =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ | create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | dt string
+ | ) using hudi
+ | partitioned by (dt)
+ | location '$basePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.datasource.write.hive_style_partitioning = '$hiveStylePartitionEnable'
+ | )
+ """.stripMargin)
+ val table = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+
+ // test msck repair table add partitions
+ import spark.implicits._
+ val df1 = Seq((1, "a1", 1000, "2022-10-06")).toDF("id", "name", "ts", "dt")
+ df1.write.format("hudi")
+ .option(TBL_NAME.key(), tableName)
+ .option(RECORDKEY_FIELD.key, "id")
+ .option(PRECOMBINE_FIELD.key, "ts")
+ .option(PARTITIONPATH_FIELD.key, "dt")
+ .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table))
+ spark.sql(s"msck repair table $tableName add partitions")
+ assertResult(Seq("dt=2022-10-06"))(spark.sessionState.catalog.listPartitionNames(table))
+
+ // test msck repair table drop partitions
+ val df2 = Seq((2, "a2", 1001, "2022-10-07")).toDF("id", "name", "ts", "dt")
+ df2.write.format("hudi")
+ .option(TBL_NAME.key(), tableName)
+ .option(RECORDKEY_FIELD.key, "id")
+ .option(PRECOMBINE_FIELD.key, "ts")
+ .option(PARTITIONPATH_FIELD.key, "dt")
+ .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertResult(Seq("dt=2022-10-06"))(spark.sessionState.catalog.listPartitionNames(table))
+ spark.sql(s"msck repair table $tableName drop partitions")
+ assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table))
+
+ // test msck repair table sync partitions
+ spark.sql(s"msck repair table $tableName sync partitions")
+ assertResult(Seq("dt=2022-10-07"))(spark.sessionState.catalog.listPartitionNames(table))
+ }
+ }
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 4156198153..2672e2c4cb 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelatio
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan}
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, ExplainCommand}
import org.apache.spark.sql.internal.SQLConf
object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
@@ -74,4 +74,18 @@ object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, Option[Expression], Option[String])] = {
throw new IllegalStateException(s"Should not call getRelationTimeTravel for spark2")
}
+
+ override def isRepairTable(plan: LogicalPlan): Boolean = {
+ plan.isInstanceOf[AlterTableRecoverPartitionsCommand]
+ }
+
+ override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = {
+ plan match {
+ // For Spark >= 3.2.x, AlterTableRecoverPartitionsCommand was renamed RepairTableCommand, and added two new
+ // parameters: enableAddPartitions and enableDropPartitions. By setting them to true and false, can restore
+ // AlterTableRecoverPartitionsCommand's behavior
+ case c: AlterTableRecoverPartitionsCommand =>
+ Some((c.tableName, true, false, c.cmd))
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 57864004df..a4016f18cc 100644
--- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -18,8 +18,10 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
import org.apache.spark.sql.types.StructType
object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -31,4 +33,18 @@ object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
}
override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema = ProjectionOverSchema(schema)
+
+ override def isRepairTable(plan: LogicalPlan): Boolean = {
+ plan.isInstanceOf[AlterTableRecoverPartitionsCommand]
+ }
+
+ override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = {
+ plan match {
+ // For Spark >= 3.2.x, AlterTableRecoverPartitionsCommand was renamed RepairTableCommand, and added two new
+ // parameters: enableAddPartitions and enableDropPartitions. By setting them to true and false, can restore
+ // AlterTableRecoverPartitionsCommand's behavior
+ case c: AlterTableRecoverPartitionsCommand =>
+ Some((c.tableName, true, false, c.cmd))
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 19025ce0d5..0548fd47a4 100644
--- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -20,8 +20,11 @@ package org.apache.spark.sql
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
+
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation}
+import org.apache.spark.sql.execution.command.RepairTableCommand
import org.apache.spark.sql.types.StructType
object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -54,4 +57,17 @@ object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
p.asInstanceOf[ProjectionOverSchema]
}
+
+ override def isRepairTable(plan: LogicalPlan): Boolean = {
+ plan.isInstanceOf[RepairTableCommand]
+ }
+
+ override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = {
+ plan match {
+ case rtc: RepairTableCommand =>
+ Some((rtc.tableName, rtc.enableAddPartitions, rtc.enableDropPartitions, rtc.cmd))
+ case _ =>
+ None
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 4d4921bc03..c364254488 100644
--- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -18,8 +18,10 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, ProjectionOverSchema}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TimeTravelRelation}
+import org.apache.spark.sql.execution.command.RepairTableCommand
import org.apache.spark.sql.types.StructType
object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -39,4 +41,17 @@ object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
override def projectOverSchema(schema: StructType, output: AttributeSet): ProjectionOverSchema =
ProjectionOverSchema(schema, output)
+
+ override def isRepairTable(plan: LogicalPlan): Boolean = {
+ plan.isInstanceOf[RepairTableCommand]
+ }
+
+ override def getRepairTableChildren(plan: LogicalPlan): Option[(TableIdentifier, Boolean, Boolean, String)] = {
+ plan match {
+ case rtc: RepairTableCommand =>
+ Some((rtc.tableName, rtc.enableAddPartitions, rtc.enableDropPartitions, rtc.cmd))
+ case _ =>
+ None
+ }
+ }
}