You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2021/06/17 15:21:29 UTC

[hudi] branch master updated: [HUDI-2033] ClassCastException Throw When PreCombineField Is String Type (#3099)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b9e28e5  [HUDI-2033] ClassCastException Throw When PreCombineField Is String Type (#3099)
b9e28e5 is described below

commit b9e28e5292d9b2a4b665c26eeba660437a6a0a45
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Thu Jun 17 23:21:20 2021 +0800

    [HUDI-2033] ClassCastException Throw When PreCombineField Is String Type (#3099)
---
 .../hudi/command/payload/ExpressionPayload.scala   | 10 ++--
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 61 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 94df155..89da81c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.hudi.command.payload
 
 import java.util.{Base64, Properties}
 import java.util.concurrent.Callable
-
 import scala.collection.JavaConverters._
 import com.google.common.cache.CacheBuilder
 import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
+import org.apache.avro.util.Utf8
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
@@ -290,15 +290,15 @@ object ExpressionPayload {
 
   /**
    * As the "baseEvaluator" return "UTF8String" for the string type which cannot be process by
-   * the Avro, The StringConvertEvaluator will convert the "UTF8String" to "String".
+   * the Avro, The StringConvertEvaluator will convert the "UTF8String" to "Utf8".
    */
   case class StringConvertEvaluator(baseEvaluator: IExpressionEvaluator) extends IExpressionEvaluator {
     /**
-     * Convert the UTF8String to String
+     * Convert the UTF8String to Utf8
      */
     override def eval(record: IndexedRecord): Array[AnyRef] = {
-      baseEvaluator.eval(record).map{
-        case s: UTF8String => s.toString
+      baseEvaluator.eval(record).map {
+        case s: UTF8String => new Utf8(s.toString)
         case o => o
       }
     }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 6a2f79d..969d07b 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -532,4 +532,65 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
       }
     }
   }
+
+  test("Test Different Type of PreCombineField") {
+    withTempDir { tmp =>
+      val typeAndValue = Seq(
+        ("string", "'1000'"),
+        ("int", 1000),
+        ("bigint", 10000),
+        ("timestamp", "'2021-05-20 00:00:00'"),
+        ("date", "'2021-05-20'")
+      )
+      typeAndValue.foreach { case (dataType, dataValue) =>
+        val tableName = generateTableName
+        // Create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  c $dataType
+             |) using hudi
+             | location '${tmp.getCanonicalPath}/$tableName'
+             | options (
+             |  primaryKey ='id',
+             |  preCombineField = 'c'
+             | )
+       """.stripMargin)
+
+        // First merge with a extra input field 'flag' (insert a new record)
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 1 as id, 'a1' as name, 10 as price, $dataValue as c0, '1' as flag
+             | ) s0
+             | on s0.id = $tableName.id
+             | when matched and flag = '1' then update set
+             | id = s0.id, name = s0.name, price = s0.price, c = s0.c0
+             | when not matched and flag = '1' then insert *
+       """.stripMargin)
+        checkAnswer(s"select id, name, price from $tableName")(
+          Seq(1, "a1", 10.0)
+        )
+
+        spark.sql(
+          s"""
+             | merge into $tableName
+             | using (
+             |  select 1 as id, 'a1' as name, 10 as price, $dataValue as c
+             | ) s0
+             | on s0.id = $tableName.id
+             | when matched then update set
+             | id = s0.id, name = s0.name, price = s0.price + $tableName.price, c = s0.c
+             | when not matched then insert *
+       """.stripMargin)
+        checkAnswer(s"select id, name, price from $tableName")(
+          Seq(1, "a1", 20.0)
+        )
+      }
+    }
+  }
 }