You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/01/02 10:42:37 UTC

[hudi] branch master updated: [HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1622b52  [HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)
1622b52 is described below

commit 1622b52c9cf90a96b3b4f8db3228253db4a93e6e
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Sun Jan 2 18:42:10 2022 +0800

    [HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala       |  8 ++++++--
 .../hudi/command/InsertIntoHoodieTableCommand.scala    |  4 ++--
 .../sql/hudi/command/TruncateHoodieTableCommand.scala  | 18 +++++++++++++++---
 .../org/apache/spark/sql/hudi/TestShowPartitions.scala |  2 +-
 4 files changed, 24 insertions(+), 8 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 92f73d5..31af719 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -254,7 +254,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
         case action: MergeAction =>
           // SPARK-34962:  use UpdateStarAction as the explicit representation of * in UpdateAction.
           // So match and covert this in Spark3.2 env.
-          UpdateAction(action.condition, Seq.empty)
+          val (resolvedCondition, resolvedAssignments) =
+            resolveConditionAssignments(action.condition, Seq.empty)
+          UpdateAction(resolvedCondition, resolvedAssignments)
       }
       // Resolve the notMatchedActions
       val resolvedNotMatchedActions = notMatchedActions.map {
@@ -265,7 +267,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
         case action: MergeAction =>
           // SPARK-34962:  use InsertStarAction as the explicit representation of * in InsertAction.
           // So match and covert this in Spark3.2 env.
-          InsertAction(action.condition, Seq.empty)
+          val (resolvedCondition, resolvedAssignments) =
+            resolveConditionAssignments(action.condition, Seq.empty)
+          InsertAction(resolvedCondition, resolvedAssignments)
       }
       // Return the resolved MergeIntoTable
       MergeIntoTable(target, resolvedSource, resolvedMergeCondition,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 61bc577..37d30c8 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -254,7 +254,7 @@ object InsertIntoHoodieTableCommand extends Logging {
       // on reading.
       classOf[ValidateDuplicateKeyPayload].getCanonicalName
     } else {
-      classOf[DefaultHoodieRecordPayload].getCanonicalName
+      classOf[OverwriteWithLatestAvroPayload].getCanonicalName
     }
     logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
 
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
index 2090185..12ec224 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.execution.command.TruncateTableCommand
 
+import scala.util.control.NonFatal
+
 /**
  * Command for truncate hudi table.
  */
@@ -36,10 +38,16 @@ class TruncateHoodieTableCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
     val properties = hoodieCatalogTable.tableConfig.getProps
-    val tablePath = hoodieCatalogTable.tableLocation
 
-    // Delete all data in the table directory
-    super.run(sparkSession)
+    try {
+      // Delete all data in the table directory
+      super.run(sparkSession)
+    } catch {
+      // TruncateTableCommand will delete the related directories first, and then refresh the table.
+      // It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
+      // So here ignore this failure, and refresh table later.
+      case NonFatal(_) =>
+    }
 
     // If we have not specified the partition, truncate will delete all the data in the table path
     // include the hoodi.properties. In this case we should reInit the table.
@@ -50,6 +58,10 @@ class TruncateHoodieTableCommand(
         .fromProperties(properties)
         .initTable(hadoopConf, hoodieCatalogTable.tableLocation)
     }
+
+    // After deleting the data, refresh the table to make sure we don't keep around a stale
+    // file relation in the metastore cache and cached table data in the cache manager.
+    sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
     Seq.empty[Row]
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
index 19d8d03..868bfc4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
@@ -154,7 +154,7 @@ class TestShowPartitions extends TestHoodieSqlBase {
       Seq("year=2021/month=02/day=default"),
       Seq("year=2021/month=02/day=01")
     )
-    checkAnswer(s"show partitions $tableName partition(day=01)")(
+    checkAnswer(s"show partitions $tableName partition(day='01')")(
       Seq("year=2021/month=02/day=01"),
       Seq("year=2021/month=default/day=01"),
       Seq("year=2021/month=01/day=01"),