You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2022/05/30 11:09:33 UTC

[hudi] branch master updated: [HUDI-4149] Drop-Table fails when underlying table directory is broken (#5672)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 918c4f4e0b [HUDI-4149] Drop-Table fails when underlying table directory is broken (#5672)
918c4f4e0b is described below

commit 918c4f4e0ba5dd30fc733cc3e208650e2154dd5c
Author: Jin Xing <ji...@gmail.com>
AuthorDate: Mon May 30 19:09:26 2022 +0800

    [HUDI-4149] Drop-Table fails when underlying table directory is broken (#5672)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  73 ++++++++------
 .../sql/hudi/command/DropHoodieTableCommand.scala  |  46 +++++----
 .../src/test/resources/sql-statements.sql          |   2 +
 .../org/apache/spark/sql/hudi/TestDropTable.scala  | 111 +++++++++++++++++++++
 4 files changed, 182 insertions(+), 50 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 76cea362a3..3dbb358fbb 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -39,9 +39,12 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
- * A wrapper of hoodie CatalogTable instance and hoodie Table.
+ * Table definition for SQL funcitonalities. Depending on the way of data generation,
+ * meta of Hudi table can be from Spark catalog or meta directory on filesystem.
+ * [[HoodieCatalogTable]] takes both meta sources into consideration when handling
+ * EXTERNAL and MANAGED tables.
  */
-class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) extends Logging {
+class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) extends Logging {
 
   assert(table.provider.map(_.toLowerCase(Locale.ROOT)).orNull == "hudi", "It's not a Hudi table")
 
@@ -117,23 +120,9 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
   lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name()
 
   /**
-   * The schema of table.
-   * Make StructField nullable and fill the comments in.
+   * Table schema
    */
-  lazy val tableSchema: StructType = {
-    val resolver = spark.sessionState.conf.resolver
-    val originSchema = getTableSqlSchema(metaClient, includeMetadataFields = true).getOrElse(table.schema)
-    val fields = originSchema.fields.map { f =>
-      val nullableField: StructField = f.copy(nullable = true)
-      val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
-      if (catalogField.isDefined) {
-        catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
-      } else {
-        nullableField
-      }
-    }
-    StructType(fields)
-  }
+  lazy val tableSchema: StructType = table.schema
 
   /**
    * The schema without hoodie meta fields
@@ -168,12 +157,14 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
   def isPartitionedTable: Boolean = table.partitionColumnNames.nonEmpty
 
   /**
-   * init hoodie table for create table (as select)
+   * Initializes table meta on filesystem when applying CREATE TABLE clause.
    */
   def initHoodieTable(): Unit = {
     logInfo(s"Init hoodie.properties for ${table.identifier.unquotedString}")
     val (finalSchema, tableConfigs) = parseSchemaAndConfigs()
 
+    table = table.copy(schema = finalSchema)
+
     // Save all the table config to the hoodie.properties.
     val properties = new Properties()
     properties.putAll(tableConfigs.asJava)
@@ -199,7 +190,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
   }
 
   /**
-   * @return schema, table parameters in which all parameters aren't sql-styled.
+   * Derives the SQL schema and configurations for a Hudi table:
+   * 1. Columns in the schema fall under two categories -- the data columns described in
+   * CREATE TABLE clause and meta columns enumerated in [[HoodieRecord#HOODIE_META_COLUMNS]];
+   * 2. Configurations derived come from config file, PROPERTIES and OPTIONS in CREATE TABLE clause.
    */
   private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
     val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
@@ -216,24 +210,25 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
         val catalogTableProps = HoodieOptionConfig.mappingSqlOptionToTableConfig(catalogProperties)
         validateTableConfig(spark, catalogTableProps, convertMapToHoodieConfig(existingTableConfig))
 
-        val options = extraTableConfig(spark, hoodieTableExists, currentTableConfig) ++
+        val options = extraTableConfig(hoodieTableExists, currentTableConfig) ++
           HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions) ++ currentTableConfig
 
-        ValidationUtils.checkArgument(tableSchema.nonEmpty || table.schema.nonEmpty,
-          s"Missing schema for Create Table: $catalogTableName")
-        val schema = if (tableSchema.nonEmpty) {
-          tableSchema
-        } else {
+        val schemaFromMetaOpt = loadTableSchemaByMetaClient()
+        val schema = if (schemaFromMetaOpt.nonEmpty) {
+          schemaFromMetaOpt.get
+        } else if (table.schema.nonEmpty) {
           addMetaFields(table.schema)
+        } else {
+          throw new AnalysisException(
+            s"Missing schema fields when applying CREATE TABLE clause for ${catalogTableName}")
         }
-
         (schema, options)
 
       case (_, false) =>
         ValidationUtils.checkArgument(table.schema.nonEmpty,
           s"Missing schema for Create Table: $catalogTableName")
         val schema = table.schema
-        val options = extraTableConfig(spark, isTableExists = false, globalTableConfigs) ++
+        val options = extraTableConfig(tableExists = false, globalTableConfigs) ++
           HoodieOptionConfig.mappingSqlOptionToTableConfig(sqlOptions)
         (addMetaFields(schema), options)
 
@@ -253,10 +248,10 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
     (finalSchema, tableConfigs)
   }
 
-  private def extraTableConfig(sparkSession: SparkSession, isTableExists: Boolean,
+  private def extraTableConfig(tableExists: Boolean,
       originTableConfig: Map[String, String] = Map.empty): Map[String, String] = {
     val extraConfig = mutable.Map.empty[String, String]
-    if (isTableExists) {
+    if (tableExists) {
       val allPartitionPaths = getPartitionPaths
       if (originTableConfig.contains(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key)) {
         extraConfig(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key) =
@@ -287,6 +282,24 @@ class HoodieCatalogTable(val spark: SparkSession, val table: CatalogTable) exten
     extraConfig.toMap
   }
 
+  private def loadTableSchemaByMetaClient(): Option[StructType] = {
+    val resolver = spark.sessionState.conf.resolver
+    getTableSqlSchema(metaClient, includeMetadataFields = true).map(originSchema => {
+      // Load table schema from meta on filesystem, and fill in 'comment'
+      // information from Spark catalog.
+      val fields = originSchema.fields.map { f =>
+        val nullableField: StructField = f.copy(nullable = true)
+        val catalogField = findColumnByName(table.schema, nullableField.name, resolver)
+        if (catalogField.isDefined) {
+          catalogField.get.getComment().map(nullableField.withComment).getOrElse(nullableField)
+        } else {
+          nullableField
+        }
+      }
+      StructType(fields)
+    })
+  }
+
   // This code is forked from org.apache.spark.sql.hive.HiveExternalCatalog#verifyDataSchema
   private def verifyDataSchema(tableIdentifier: TableIdentifier, tableType: CatalogTableType,
       dataSchema: Seq[StructField]): Unit = {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala
index c24d0fd992..a0252861db 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DropHoodieTableCommand.scala
@@ -23,39 +23,44 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.sync.common.util.ConfigUtils
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 
-import scala.util.control.NonFatal
-
+/**
+ * Physical plan node for dropping a table.
+ */
 case class DropHoodieTableCommand(
     tableIdentifier: TableIdentifier,
     ifExists: Boolean,
     isView: Boolean,
-    purge: Boolean)
-extends HoodieLeafRunnableCommand {
+    purge: Boolean) extends HoodieLeafRunnableCommand {
 
-  val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt"
-  val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro"
+  private val MOR_SNAPSHOT_TABLE_SUFFIX = "_rt"
+  private val MOR_READ_OPTIMIZED_TABLE_SUFFIX = "_ro"
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val fullTableName = s"${tableIdentifier.database}.${tableIdentifier.table}"
-    logInfo(s"start execute drop table command for $fullTableName")
-    sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
-
-    try {
-      // drop catalog table for this hoodie table
-      dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to drop catalog table in metastore: ${e.getMessage}")
+    logInfo(s"Start executing 'DROP TABLE' on ${tableIdentifier.unquotedString}" +
+      s" (ifExists=${ifExists}, purge=${purge}).")
+    if (!sparkSession.catalog.tableExists(tableIdentifier.unquotedString)) {
+      sparkSession.catalog.refreshTable(tableIdentifier.unquotedString)
     }
+    val qualifiedTableName = QualifiedTableName(
+      tableIdentifier.database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
+      tableIdentifier.table)
+    sparkSession.sessionState.catalog.invalidateCachedTable(qualifiedTableName)
+
+    dropTableInCatalog(sparkSession, tableIdentifier, ifExists, purge)
 
-    logInfo(s"Finish execute drop table command for $fullTableName")
+    logInfo(s"Finished executing 'DROP TABLE' on ${tableIdentifier.unquotedString}.")
     Seq.empty[Row]
   }
 
-  def dropTableInCatalog(sparkSession: SparkSession,
+  /**
+   * Drops table in Spark catalog. Note that RO & RT table could coexist with a MOR table.
+   * If `purge` enabled, RO & RT table and corresponding data directory on filesystem will
+   * all be removed.
+   */
+  private def dropTableInCatalog(sparkSession: SparkSession,
           tableIdentifier: TableIdentifier,
           ifExists: Boolean,
           purge: Boolean): Unit = {
@@ -67,7 +72,8 @@ extends HoodieLeafRunnableCommand {
     val catalog = sparkSession.sessionState.catalog
 
     // Drop table in the catalog
-    if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
+    if (hoodieCatalogTable.hoodieTableExists &&
+        HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
       val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable)
       rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
       roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
index 3e92d31e3a..449ba2e2e6 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
+++ b/hudi-spark-datasource/hudi-spark/src/test/resources/sql-statements.sql
@@ -34,6 +34,7 @@ set hoodie.delete.shuffle.parallelism = 1;
 # CTAS
 
 create table h0 using hudi options(type = '${tableType}', primaryKey = 'id')
+location '${tmpDir}/h0'
 as select 1 as id, 'a1' as name, 10 as price;
 +----------+
 | ok       |
@@ -46,6 +47,7 @@ select id, name, price from h0;
 
 create table h0_p using hudi partitioned by(dt)
 options(type = '${tableType}', primaryKey = 'id')
+location '${tmpDir}/h0_p'
 as select cast('2021-05-07 00:00:00' as timestamp) as dt,
  1 as id, 'a1' as name, 10 as price;
 +----------+
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
index 174835cbac..1beb78e27e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDropTable.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hadoop.fs.{LocalFileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 
@@ -230,6 +232,115 @@ class TestDropTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Drop an EXTERNAL table which path is lost.") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |id int,
+           |ts int,
+           |value string
+           |)using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           |""".stripMargin)
+
+      assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exists (${tablePath}).")
+
+      filesystem.delete(new Path(tablePath), true)
+      spark.sql(s"drop table ${tableName}")
+      checkAnswer("show tables")()
+    }
+  }
+
+  test("Drop an MOR table and related RT & RO when path is lost.") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |id int,
+           |ts int,
+           |value string
+           |)using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  type = 'mor'
+           | )
+           |""".stripMargin)
+      assert(filesystem.exists(new Path(tablePath)), s"Table path doesn't exist (${tablePath}).")
+
+      spark.sql(
+        s"""
+           |create table ${tableName}_ro using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
+        Map("hoodie.query.as.ro.table" -> "true"))
+
+      spark.sql(
+        s"""
+           |create table ${tableName}_rt using hudi
+           | location '${tmp.getCanonicalPath}/$tableName'
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
+        Map("hoodie.query.as.ro.table" -> "false"))
+
+      filesystem.delete(new Path(tablePath), true)
+      spark.sql(s"drop table ${tableName}")
+      spark.sql(s"drop table ${tableName}_ro")
+      spark.sql(s"drop table ${tableName}_rt")
+      checkAnswer("show tables")()
+    }
+  }
+
+
+  test("Drop an MANAGED table which path is lost.") {
+    val tableName = generateTableName
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |id int,
+         |ts int,
+         |value string
+         |)using hudi
+         | tblproperties (
+         |  primaryKey = 'id',
+         |  preCombineField = 'ts'
+         | )
+         |""".stripMargin)
+
+    val tablePath = new Path(
+      spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).location)
+
+    val filesystem = FSUtils.getFs(tablePath, spark.sparkContext.hadoopConfiguration);
+    assert(filesystem.exists(tablePath), s"Table path doesn't exists ($tablePath).")
+
+    filesystem.delete(tablePath, true)
+    spark.sql(s"drop table ${tableName}")
+    checkAnswer("show tables")()
+  }
+
   private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
     newProperties: Map[String, String]): Unit = {
     val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)