You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/24 11:09:52 UTC
[hudi] branch master updated: [HUDI-2053] Insert Static Partition
With DateType Return Incorrect Partition Value (#3133)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 84dd3ca [HUDI-2053] Insert Static Partition With DateType Return Incorrect Partition Value (#3133)
84dd3ca is described below
commit 84dd3ca18b62921a60c3970a0f26614d8bafc89d
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Thu Jun 24 19:09:37 2021 +0800
[HUDI-2053] Insert Static Partition With DateType Return Incorrect Partition Value (#3133)
---
.../org/apache/spark/sql/hudi/HoodieSqlUtils.scala | 7 ++-
.../command/InsertIntoHoodieTableCommand.scala | 7 ++-
.../apache/spark/sql/hudi/TestHoodieSqlBase.scala | 9 +++
.../apache/spark/sql/hudi/TestInsertTable.scala | 64 ++++++++++++++++++++++
4 files changed, 84 insertions(+), 3 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
index a1c38bb..27846e1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hudi
import scala.collection.JavaConverters._
import java.net.URI
import java.util.Locale
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.SparkAdapterSupport
@@ -30,7 +29,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
@@ -106,6 +105,10 @@ object HoodieSqlUtils extends SparkAdapterSupport {
}
}
+ def removeMetaFields(attrs: Seq[Attribute]): Seq[Attribute] = {
+ attrs.filterNot(attr => isMetaField(attr.name))
+ }
+
/**
* Get the table location.
* @param tableId
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 2ad9a68..c7fac53 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -128,6 +128,11 @@ object InsertIntoHoodieTableCommand {
s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " +
s"is: ${staticPartitionValues.mkString("," + "")}")
+ assert(staticPartitionValues.size + query.output.size == table.schema.size,
+ s"Required select columns count: ${removeMetaFields(table.schema).size}, " +
+ s"Current select columns(including static partition column) count: " +
+ s"${staticPartitionValues.size + removeMetaFields(query.output).size},columns: " +
+ s"(${(removeMetaFields(query.output).map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
query.output.dropRight(targetPartitionSchema.fields.length)
} else { // insert static partition
@@ -156,7 +161,7 @@ object InsertIntoHoodieTableCommand {
targetPartitionSchema.fields.map(f => {
val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
s"Missing static partition value for: ${f.name}")
- val castAttr = Literal.create(staticPartitionValue, f.dataType)
+ val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
Alias(castAttr, f.name)()
})
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
index 610c71f..067e49a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
@@ -78,4 +78,13 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
protected def checkAnswer(sql: String)(expects: Seq[Any]*): Unit = {
assertResult(expects.map(row => Row(row: _*)).toArray)(spark.sql(sql).collect())
}
+
+ protected def checkException(sql: String)(errorMsg: String): Unit = {
+ try {
+ spark.sql(sql)
+ } catch {
+ case e: Throwable =>
+ assertResult(errorMsg)(e.getMessage)
+ }
+ }
}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 98d095b..945ccf5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -220,4 +220,68 @@ class TestInsertTable extends TestHoodieSqlBase {
)
}
}
+
+ test("Test Different Type of Partition Column") {
+ withTempDir { tmp =>
+ val typeAndValue = Seq(
+ ("string", "'1000'"),
+ ("int", 1000),
+ ("bigint", 10000),
+ ("timestamp", "'2021-05-20 00:00:00'"),
+ ("date", "'2021-05-20'")
+ )
+ typeAndValue.foreach { case (partitionType, partitionValue) =>
+ val tableName = generateTableName
+ // Create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | dt $partitionType
+ |) using hudi
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}/$tableName'
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10")
+ spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
+ checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")(
+ Seq(1, "a1", 10, removeQuotes(partitionValue).toString),
+ Seq(2, "a2", 10, removeQuotes(partitionValue).toString)
+ )
+ }
+ }
+ }
+
+ test("Test Insert Exception") {
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | dt string
+ |) using hudi
+ | partitioned by (dt)
+ """.stripMargin)
+ checkException(s"insert into $tableName partition(dt = '2021-06-20')" +
+ s" select 1, 'a1', 10, '2021-06-20'") (
+ "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
+ " count: 5,columns: (1,a1,10,2021-06-20,dt)"
+ )
+ checkException(s"insert into $tableName select 1, 'a1', 10")(
+ "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
+ " count: 3,columns: (1,a1,10)"
+ )
+ }
+
+ private def removeQuotes(value: Any): Any = {
+ value match {
+ case s: String => s.stripPrefix("'").stripSuffix("'")
+ case _=> value
+ }
+ }
}