You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 17:48:46 UTC

[hudi] 15/16: [address comments and add more test]

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3b8e782407ee036c859105ac8bf8518c73cd00f7
Author: xiarixiaoyao <me...@qq.com>
AuthorDate: Thu Apr 21 11:07:50 2022 +0800

    [address comments and add more test]
---
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  8 +--
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 66 +++++++++++++++-------
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 31 ++++------
 3 files changed, 62 insertions(+), 43 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index ab8a3d7033..5d5760961a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -102,7 +102,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
   protected final String writeToken;
   protected final TaskContextSupplier taskContextSupplier;
   // For full schema evolution
-  protected final boolean schemaOnReadEnable;
+  protected final boolean schemaOnReadEnabled;
 
   public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
                            String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
@@ -125,7 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
         !hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
     this.taskContextSupplier = taskContextSupplier;
     this.writeToken = makeWriteToken();
-    schemaOnReadEnable = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+    schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
   }
 
   /**
@@ -230,12 +230,12 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
    * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
    */
   protected GenericRecord rewriteRecord(GenericRecord record) {
-    return schemaOnReadEnable ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
+    return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
         : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
   }
 
   protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
-    return schemaOnReadEnable ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
+    return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
         : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 37d84a2895..47be7117a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -65,13 +65,14 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Deque;
+import java.util.LinkedList;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
@@ -748,11 +749,24 @@ public class HoodieAvroUtils {
    * @return newRecord for new Schema
    */
   public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
-    Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols);
+    Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
     return (GenericData.Record) newRecord;
   }
 
-  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
+   * support deep rewrite for nested record and adjust rename operation.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewritten
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
+   * @param fieldNames track the full name of visited field when we travel new schema.
+   * @return newRecord for new Schema
+   */
+  private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
     if (oldRecord == null) {
       return null;
     }
@@ -767,23 +781,23 @@ public class HoodieAvroUtils {
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols));
-          }
-          // deal with rename
-          if (!renameCols.isEmpty() && oldSchema.getField(field.name()) == null) {
-            String fieldName = field.name();
-            for (Map.Entry<String, String> entry : renameCols.entrySet()) {
-              List<String> nameParts = Arrays.asList(entry.getKey().split("\\."));
-              List<String> namePartsOld = Arrays.asList(entry.getValue().split("\\."));
-              if (nameParts.get(nameParts.size() - 1).equals(fieldName) && oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)) != null) {
-                // find rename
-                Schema.Field oldField = oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1));
-                helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols));
-              }
+            helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+          } else {
+            String fieldFullName = createFullName(fieldNames);
+            String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
+            String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
+            // deal with rename
+            if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
+              // find rename
+              Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
+              helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
             }
           }
+          fieldNames.pop();
         }
         GenericData.Record newRecord = new GenericData.Record(newSchema);
         for (int i = 0; i < fields.size(); i++) {
@@ -804,9 +818,11 @@ public class HoodieAvroUtils {
         }
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
+        fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols));
+          newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
         }
+        fieldNames.pop();
         return newArray;
       case MAP:
         if (!(oldRecord instanceof Map)) {
@@ -814,17 +830,29 @@ public class HoodieAvroUtils {
         }
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
+        fieldNames.push("value");
         for (Map.Entry<Object, Object> entry : map.entrySet()) {
-          newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols));
+          newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
         }
+        fieldNames.pop();
         return newMap;
       case UNION:
-        return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols);
+        return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
       default:
         return rewritePrimaryType(oldRecord, oldSchema, newSchema);
     }
   }
 
+  private static String createFullName(Deque<String> fieldNames) {
+    String result = "";
+    if (!fieldNames.isEmpty()) {
+      List<String> parentNames = new ArrayList<>();
+      fieldNames.descendingIterator().forEachRemaining(parentNames::add);
+      result = parentNames.stream().collect(Collectors.joining("."));
+    }
+    return result;
+  }
+
   private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) {
     Schema realOldSchema = oldSchema;
     if (realOldSchema.getType() == UNION) {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index ae828ed9f7..5416363598 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase {
             Seq(null),
             Seq(Map("t1" -> 10.0d))
           )
+          spark.sql(s"alter table ${tableName} rename column members to mem")
+          spark.sql(s"alter table ${tableName} rename column mem.value.n to nn")
+          spark.sql(s"alter table ${tableName} rename column userx to us")
+          spark.sql(s"alter table ${tableName} rename column us.age to age1")
+
+          spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
+          spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show()
+          checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())(
+            Seq(null, 29),
+            Seq(null, 291)
+          )
         }
       }
     }
   }
-
-  private def performClustering(writeDf: DataFrame, basePath: String, tableName: String, tableType: String): Unit = {
-    writeDf.write.format("org.apache.hudi")
-      .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
-      .option("hoodie.upsert.shuffle.parallelism", "1")
-      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
-      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "comb")
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "par")
-      .option(HoodieWriteConfig.TBL_NAME.key, tableName)
-      .option("hoodie.schema.on.read.enable", "true")
-      // option for clustering
-      .option("hoodie.clustering.inline", "true")
-      .option("hoodie.clustering.inline.max.commits", "1")
-      .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(2*1024*1024L))
-      .option("hoodie.clustering.plan.strategy.max.bytes.per.group", String.valueOf(10*1024*1024L))
-      .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(4 * 1024* 1024L))
-      .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "col1, col2")
-      .mode(SaveMode.Append)
-      .save(basePath)
-  }
 }