You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 17:48:46 UTC
[hudi] 15/16: [address comments and add more test]
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 3b8e782407ee036c859105ac8bf8518c73cd00f7
Author: xiarixiaoyao <me...@qq.com>
AuthorDate: Thu Apr 21 11:07:50 2022 +0800
[address comments and add more test]
---
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 8 +--
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 66 +++++++++++++++-------
.../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 31 ++++------
3 files changed, 62 insertions(+), 43 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index ab8a3d7033..5d5760961a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -102,7 +102,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
protected final String writeToken;
protected final TaskContextSupplier taskContextSupplier;
// For full schema evolution
- protected final boolean schemaOnReadEnable;
+ protected final boolean schemaOnReadEnabled;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
@@ -125,7 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
- schemaOnReadEnable = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
+ schemaOnReadEnabled = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
}
/**
@@ -230,12 +230,12 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
- return schemaOnReadEnable ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
+ return schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
: HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}
protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
- return schemaOnReadEnable ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
+ return schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
: HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 37d84a2895..47be7117a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -65,13 +65,14 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Deque;
+import java.util.LinkedList;
import java.util.TimeZone;
import java.util.stream.Collectors;
@@ -748,11 +749,24 @@ public class HoodieAvroUtils {
* @return newRecord for new Schema
*/
public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
- Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols);
+ Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());
return (GenericData.Record) newRecord;
}
- private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
+ /**
+ * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new schema.
+ * support deep rewrite for nested record and adjust rename operation.
+ * This particular method does the following things :
+ * a) Create a new empty GenericRecord with the new schema.
+ * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this transformed schema
+ *
+ * @param oldRecord oldRecord to be rewritten
+ * @param newSchema newSchema used to rewrite oldRecord
+ * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
+ * @param fieldNames track the full name of visited field when we travel new schema.
+ * @return newRecord for new Schema
+ */
+ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) {
if (oldRecord == null) {
return null;
}
@@ -767,23 +781,23 @@ public class HoodieAvroUtils {
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
+ String fieldName = field.name();
+ fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
- helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols));
- }
- // deal with rename
- if (!renameCols.isEmpty() && oldSchema.getField(field.name()) == null) {
- String fieldName = field.name();
- for (Map.Entry<String, String> entry : renameCols.entrySet()) {
- List<String> nameParts = Arrays.asList(entry.getKey().split("\\."));
- List<String> namePartsOld = Arrays.asList(entry.getValue().split("\\."));
- if (nameParts.get(nameParts.size() - 1).equals(fieldName) && oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)) != null) {
- // find rename
- Schema.Field oldField = oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1));
- helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols));
- }
+ helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+ } else {
+ String fieldFullName = createFullName(fieldNames);
+ String[] colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.");
+ String lastColNameFromOldSchema = colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
+ // deal with rename
+ if (oldSchema.getField(field.name()) == null && oldSchema.getField(lastColNameFromOldSchema) != null) {
+ // find rename
+ Schema.Field oldField = oldSchema.getField(lastColNameFromOldSchema);
+ helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
}
}
+ fieldNames.pop();
}
GenericData.Record newRecord = new GenericData.Record(newSchema);
for (int i = 0; i < fields.size(); i++) {
@@ -804,9 +818,11 @@ public class HoodieAvroUtils {
}
Collection array = (Collection)oldRecord;
List<Object> newArray = new ArrayList();
+ fieldNames.push("element");
for (Object element : array) {
- newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols));
+ newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols, fieldNames));
}
+ fieldNames.pop();
return newArray;
case MAP:
if (!(oldRecord instanceof Map)) {
@@ -814,17 +830,29 @@ public class HoodieAvroUtils {
}
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>();
+ fieldNames.push("value");
for (Map.Entry<Object, Object> entry : map.entrySet()) {
- newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols));
+ newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols, fieldNames));
}
+ fieldNames.pop();
return newMap;
case UNION:
- return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols);
+ return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
}
+ private static String createFullName(Deque<String> fieldNames) {
+ String result = "";
+ if (!fieldNames.isEmpty()) {
+ List<String> parentNames = new ArrayList<>();
+ fieldNames.descendingIterator().forEachRemaining(parentNames::add);
+ result = parentNames.stream().collect(Collectors.joining("."));
+ }
+ return result;
+ }
+
private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) {
Schema realOldSchema = oldSchema;
if (realOldSchema.getType() == UNION) {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index ae828ed9f7..5416363598 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase {
Seq(null),
Seq(Map("t1" -> 10.0d))
)
+ spark.sql(s"alter table ${tableName} rename column members to mem")
+ spark.sql(s"alter table ${tableName} rename column mem.value.n to nn")
+ spark.sql(s"alter table ${tableName} rename column userx to us")
+ spark.sql(s"alter table ${tableName} rename column us.age to age1")
+
+ spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))")
+ spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show()
+ checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())(
+ Seq(null, 29),
+ Seq(null, 291)
+ )
}
}
}
}
-
- private def performClustering(writeDf: DataFrame, basePath: String, tableName: String, tableType: String): Unit = {
- writeDf.write.format("org.apache.hudi")
- .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType)
- .option("hoodie.upsert.shuffle.parallelism", "1")
- .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
- .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "comb")
- .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "par")
- .option(HoodieWriteConfig.TBL_NAME.key, tableName)
- .option("hoodie.schema.on.read.enable", "true")
- // option for clustering
- .option("hoodie.clustering.inline", "true")
- .option("hoodie.clustering.inline.max.commits", "1")
- .option("hoodie.clustering.plan.strategy.small.file.limit", String.valueOf(2*1024*1024L))
- .option("hoodie.clustering.plan.strategy.max.bytes.per.group", String.valueOf(10*1024*1024L))
- .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(4 * 1024* 1024L))
- .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "col1, col2")
- .mode(SaveMode.Append)
- .save(basePath)
- }
}