You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/01/02 10:42:37 UTC
[hudi] branch master updated: [HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1622b52 [HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)
1622b52 is described below
commit 1622b52c9cf90a96b3b4f8db3228253db4a93e6e
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Sun Jan 2 18:42:10 2022 +0800
[HUDI-3136] Fix merge/insert/show partitions error on Spark3.2 (#4490)
---
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 8 ++++++--
.../hudi/command/InsertIntoHoodieTableCommand.scala | 4 ++--
.../sql/hudi/command/TruncateHoodieTableCommand.scala | 18 +++++++++++++++---
.../org/apache/spark/sql/hudi/TestShowPartitions.scala | 2 +-
4 files changed, 24 insertions(+), 8 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 92f73d5..31af719 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -254,7 +254,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case action: MergeAction =>
// SPARK-34962: use UpdateStarAction as the explicit representation of * in UpdateAction.
// So match and covert this in Spark3.2 env.
- UpdateAction(action.condition, Seq.empty)
+ val (resolvedCondition, resolvedAssignments) =
+ resolveConditionAssignments(action.condition, Seq.empty)
+ UpdateAction(resolvedCondition, resolvedAssignments)
}
// Resolve the notMatchedActions
val resolvedNotMatchedActions = notMatchedActions.map {
@@ -265,7 +267,9 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi
case action: MergeAction =>
// SPARK-34962: use InsertStarAction as the explicit representation of * in InsertAction.
// So match and covert this in Spark3.2 env.
- InsertAction(action.condition, Seq.empty)
+ val (resolvedCondition, resolvedAssignments) =
+ resolveConditionAssignments(action.condition, Seq.empty)
+ InsertAction(resolvedCondition, resolvedAssignments)
}
// Return the resolved MergeIntoTable
MergeIntoTable(target, resolvedSource, resolvedMergeCondition,
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 61bc577..37d30c8 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -21,7 +21,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -254,7 +254,7 @@ object InsertIntoHoodieTableCommand extends Logging {
// on reading.
classOf[ValidateDuplicateKeyPayload].getCanonicalName
} else {
- classOf[DefaultHoodieRecordPayload].getCanonicalName
+ classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
logInfo(s"insert statement use write operation type: $operation, payloadClass: $payloadClassName")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
index 2090185..12ec224 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/TruncateHoodieTableCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.TruncateTableCommand
+import scala.util.control.NonFatal
+
/**
* Command for truncate hudi table.
*/
@@ -36,10 +38,16 @@ class TruncateHoodieTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val hoodieCatalogTable = HoodieCatalogTable(sparkSession, tableIdentifier)
val properties = hoodieCatalogTable.tableConfig.getProps
- val tablePath = hoodieCatalogTable.tableLocation
- // Delete all data in the table directory
- super.run(sparkSession)
+ try {
+ // Delete all data in the table directory
+ super.run(sparkSession)
+ } catch {
+ // TruncateTableCommand will delete the related directories first, and then refresh the table.
+ // It will fail when refresh table, because the hudi meta directory(.hoodie) has been deleted at the first step.
+ // So here ignore this failure, and refresh table later.
+ case NonFatal(_) =>
+ }
// If we have not specified the partition, truncate will delete all the data in the table path
// include the hoodi.properties. In this case we should reInit the table.
@@ -50,6 +58,10 @@ class TruncateHoodieTableCommand(
.fromProperties(properties)
.initTable(hadoopConf, hoodieCatalogTable.tableLocation)
}
+
+ // After deleting the data, refresh the table to make sure we don't keep around a stale
+ // file relation in the metastore cache and cached table data in the cache manager.
+ sparkSession.catalog.refreshTable(hoodieCatalogTable.table.identifier.quotedString)
Seq.empty[Row]
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
index 19d8d03..868bfc4 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
@@ -154,7 +154,7 @@ class TestShowPartitions extends TestHoodieSqlBase {
Seq("year=2021/month=02/day=default"),
Seq("year=2021/month=02/day=01")
)
- checkAnswer(s"show partitions $tableName partition(day=01)")(
+ checkAnswer(s"show partitions $tableName partition(day='01')")(
Seq("year=2021/month=02/day=01"),
Seq("year=2021/month=default/day=01"),
Seq("year=2021/month=01/day=01"),