You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by me...@apache.org on 2022/10/29 01:24:23 UTC

[hudi] branch master updated: [HUDI-5083]Fixed a bug when schema evolution (#7045)

This is an automated email from the ASF dual-hosted git repository.

mengtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b4563ddca5 [HUDI-5083]Fixed a bug when schema evolution (#7045)
b4563ddca5 is described below

commit b4563ddca566358be0897523decd29fe84ab332a
Author: 申胜利 <48...@users.noreply.github.com>
AuthorDate: Sat Oct 29 09:24:16 2022 +0800

    [HUDI-5083]Fixed a bug when schema evolution (#7045)
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  4 +--
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 38 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index a352e86b96..5288f7fa0c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -767,14 +767,14 @@ public class HoodieAvroUtils {
           Schema.Field field = fields.get(i);
           String fieldName = field.name();
           fieldNames.push(fieldName);
-          if (oldSchema.getField(field.name()) != null) {
+          if (oldSchema.getField(field.name()) != null && !renameCols.containsKey(field.name())) {
             Schema.Field oldField = oldSchema.getField(field.name());
             newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
           } else {
             String fieldFullName = createFullName(fieldNames);
             String fieldNameFromOldSchema = renameCols.getOrDefault(fieldFullName, "");
             // deal with rename
-            if (oldSchema.getField(field.name()) == null && oldSchema.getField(fieldNameFromOldSchema) != null) {
+            if (oldSchema.getField(fieldNameFromOldSchema) != null) {
               // find rename
               Schema.Field oldField = oldSchema.getField(fieldNameFromOldSchema);
               newRecord.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 65357b903b..9d955cb831 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -385,6 +385,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Alter Table multiple times") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}"
+        if (HoodieSparkUtils.gteqSpark3_1) {
+          spark.sql("set hoodie.schema.on.read.enable=true")
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  col1 string,
+               |  col2 string,
+               |  ts long
+               |) using hudi
+               | location '$tablePath'
+               | options (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
+             """.stripMargin)
+          spark.sql(s"show create table ${tableName}").show(false)
+          spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")
+
+          // Rename to a previously existing column name + insert
+          spark.sql(s"alter table ${tableName} drop column col1")
+          spark.sql(s"alter table ${tableName} rename column col2 to col1")
+
+          spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
+          checkAnswer(spark.sql(s"select col1 from ${tableName} order by id").collect())(
+            Seq("bbb"), Seq("aaa")
+          )
+        }
+      }
+    }
+  }
+
   test("Test Alter Table complex") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>