You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/24 11:09:52 UTC

[hudi] branch master updated: [HUDI-2053] Insert Static Partition With DateType Return Incorrect Partition Value (#3133)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84dd3ca  [HUDI-2053]  Insert Static Partition With DateType Return Incorrect Partition Value (#3133)
84dd3ca is described below

commit 84dd3ca18b62921a60c3970a0f26614d8bafc89d
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Thu Jun 24 19:09:37 2021 +0800

    [HUDI-2053]  Insert Static Partition With DateType Return Incorrect Partition Value (#3133)
---
 .../org/apache/spark/sql/hudi/HoodieSqlUtils.scala |  7 ++-
 .../command/InsertIntoHoodieTableCommand.scala     |  7 ++-
 .../apache/spark/sql/hudi/TestHoodieSqlBase.scala  |  9 +++
 .../apache/spark/sql/hudi/TestInsertTable.scala    | 64 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
index a1c38bb..27846e1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hudi
 import scala.collection.JavaConverters._
 import java.net.URI
 import java.util.Locale
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.SparkAdapterSupport
@@ -30,7 +29,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SQLConf
@@ -106,6 +105,10 @@ object HoodieSqlUtils extends SparkAdapterSupport {
     }
   }
 
+  def removeMetaFields(attrs: Seq[Attribute]): Seq[Attribute] = {
+    attrs.filterNot(attr => isMetaField(attr.name))
+  }
+
   /**
    * Get the table location.
    * @param tableId
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 2ad9a68..c7fac53 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -128,6 +128,11 @@ object InsertIntoHoodieTableCommand {
       s"Required partition columns is: ${targetPartitionSchema.json}, Current static partitions " +
         s"is: ${staticPartitionValues.mkString("," + "")}")
 
+    assert(staticPartitionValues.size + query.output.size == table.schema.size,
+      s"Required select columns count: ${removeMetaFields(table.schema).size}, " +
+        s"Current select columns(including static partition column) count: " +
+        s"${staticPartitionValues.size + removeMetaFields(query.output).size},columns: " +
+        s"(${(removeMetaFields(query.output).map(_.name) ++ staticPartitionValues.keys).mkString(",")})")
     val queryDataFields = if (staticPartitionValues.isEmpty) { // insert dynamic partition
       query.output.dropRight(targetPartitionSchema.fields.length)
     } else { // insert static partition
@@ -156,7 +161,7 @@ object InsertIntoHoodieTableCommand {
       targetPartitionSchema.fields.map(f => {
         val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
         s"Missing static partition value for: ${f.name}")
-        val castAttr = Literal.create(staticPartitionValue, f.dataType)
+        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf)
         Alias(castAttr, f.name)()
       })
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
index 610c71f..067e49a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala
@@ -78,4 +78,13 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll {
   protected def checkAnswer(sql: String)(expects: Seq[Any]*): Unit = {
     assertResult(expects.map(row => Row(row: _*)).toArray)(spark.sql(sql).collect())
   }
+
+  protected def checkException(sql: String)(errorMsg: String): Unit = {
+    try {
+      spark.sql(sql)
+    } catch {
+      case e: Throwable =>
+        assertResult(errorMsg)(e.getMessage)
+    }
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 98d095b..945ccf5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -220,4 +220,68 @@ class TestInsertTable extends TestHoodieSqlBase {
       )
     }
   }
+
+  test("Test Different Type of Partition Column") {
+    withTempDir { tmp =>
+      val typeAndValue = Seq(
+        ("string", "'1000'"),
+        ("int", 1000),
+        ("bigint", 10000),
+        ("timestamp", "'2021-05-20 00:00:00'"),
+        ("date", "'2021-05-20'")
+      )
+      typeAndValue.foreach { case (partitionType, partitionValue) =>
+        val tableName = generateTableName
+        // Create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  dt $partitionType
+             |) using hudi
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}/$tableName'
+       """.stripMargin)
+
+        spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10")
+        spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
+        checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")(
+          Seq(1, "a1", 10, removeQuotes(partitionValue).toString),
+          Seq(2, "a2", 10, removeQuotes(partitionValue).toString)
+        )
+      }
+    }
+  }
+
+  test("Test Insert Exception") {
+    val tableName = generateTableName
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  name string,
+         |  price double,
+         |  dt string
+         |) using hudi
+         | partitioned by (dt)
+       """.stripMargin)
+    checkException(s"insert into $tableName partition(dt = '2021-06-20')" +
+      s" select 1, 'a1', 10, '2021-06-20'") (
+      "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
+        " count: 5,columns: (1,a1,10,2021-06-20,dt)"
+    )
+    checkException(s"insert into $tableName select 1, 'a1', 10")(
+      "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" +
+        " count: 3,columns: (1,a1,10)"
+    )
+  }
+
+  private def removeQuotes(value: Any): Any = {
+    value match {
+      case s: String => s.stripPrefix("'").stripSuffix("'")
+      case _=> value
+    }
+  }
 }