You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/04/21 17:48:45 UTC
[hudi] 14/16: [HUDI-3921] Fixed schema evolution cannot work with HUDI-3855
This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch rc3-patched-for-test
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 5bbd47f22214dc1c911688ebb04f75b8e9d7cbcb
Author: xiarixiaoyao <me...@qq.com>
AuthorDate: Wed Apr 20 20:29:54 2022 +0800
[HUDI-3921] Fixed schema evolution cannot work with HUDI-3855
---
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 12 +++++-
.../table/action/commit/HoodieMergeHelper.java | 12 +++++-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 46 +++++++++++++++++-----
.../table/log/AbstractHoodieLogRecordReader.java | 3 +-
.../schema/action/InternalSchemaMerger.java | 26 +++++++++++-
.../internal/schema/utils/InternalSchemaUtils.java | 16 ++++++++
.../schema/utils/TestAvroSchemaEvolutionUtils.java | 4 +-
7 files changed, 101 insertions(+), 18 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 89babc7725..ab8a3d7033 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -46,6 +46,9 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.HashMap;
+
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
/**
* Base class for all write operations logically performed at the file group level.
@@ -98,6 +101,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
protected final String fileId;
protected final String writeToken;
protected final TaskContextSupplier taskContextSupplier;
+ // For full schema evolution
+ protected final boolean schemaOnReadEnable;
public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, String partitionPath,
String fileId, HoodieTable<T, I, K, O> hoodieTable, TaskContextSupplier taskContextSupplier) {
@@ -120,6 +125,7 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
!hoodieTable.getIndex().isImplicitWithStorage(), config.getWriteStatusFailureFraction());
this.taskContextSupplier = taskContextSupplier;
this.writeToken = makeWriteToken();
+ schemaOnReadEnable = !isNullOrEmpty(hoodieTable.getConfig().getInternalSchema());
}
/**
@@ -224,11 +230,13 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
* Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields.
*/
protected GenericRecord rewriteRecord(GenericRecord record) {
- return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
+ return schemaOnReadEnable ? HoodieAvroUtils.rewriteRecordWithNewSchema(record, writeSchemaWithMetaFields, new HashMap<>())
+ : HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}
protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
- return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
+ return schemaOnReadEnable ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata(record, writeSchemaWithMetaFields, fileName)
+ : HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
}
public abstract List<WriteStatus> close();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
index 578cdf0bc7..e964cfc9b3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
@@ -36,6 +36,7 @@ import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Collectors;
public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
@@ -93,6 +96,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(table.getConfig().getInternalSchema());
boolean needToReWriteRecord = false;
+ Map<String, String> renameCols = new HashMap<>();
// TODO support bootstrap
if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) {
// check implicitly add columns, and position reorder(spark sql may change cols order)
@@ -109,10 +113,14 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
&& writeInternalSchema.findIdByName(f) == querySchema.findIdByName(f)
&& writeInternalSchema.findIdByName(f) != -1
&& writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
- readSchema = AvroInternalSchemaConverter.convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false).mergeSchema(), readSchema.getName());
+ readSchema = AvroInternalSchemaConverter
+ .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, true, false, false).mergeSchema(), readSchema.getName());
Schema writeSchemaFromFile = AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
|| SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile, readSchema).getType() == org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+ if (needToReWriteRecord) {
+ renameCols = InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
+ }
}
try {
@@ -121,7 +129,7 @@ public class HoodieMergeHelper<T extends HoodieRecordPayload> extends
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
if (needToReWriteRecord) {
- readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema);
+ readerIterator = HoodieAvroUtils.rewriteRecordWithNewSchema(reader.getRecordIterator(), readSchema, renameCols);
} else {
readerIterator = reader.getRecordIterator(readSchema);
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index edfecec515..37d84a2895 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -65,6 +65,7 @@ import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -406,6 +407,18 @@ public class HoodieAvroUtils {
return newRecord;
}
+ // TODO Unify the logical of rewriteRecordWithMetadata and rewriteEvolutionRecordWithMetadata, and delete this function.
+ public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
+ GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new HashMap<>());
+ // do not preserve FILENAME_METADATA_FIELD
+ newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
+ if (!GenericData.get().validate(newSchema, newRecord)) {
+ throw new SchemaCompatibilityException(
+ "Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);
+ }
+ return newRecord;
+ }
+
/**
* Converts list of {@link GenericRecord} provided into the {@link GenericRecord} adhering to the
* provided {@code newSchema}.
@@ -731,14 +744,15 @@ public class HoodieAvroUtils {
*
* @param oldRecord oldRecord to be rewritten
* @param newSchema newSchema used to rewrite oldRecord
+ * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return newRecord for new Schema
*/
- public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema) {
- Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema);
+ public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord oldRecord, Schema newSchema, Map<String, String> renameCols) {
+ Object newRecord = rewriteRecordWithNewSchema(oldRecord, oldRecord.getSchema(), newSchema, renameCols);
return (GenericData.Record) newRecord;
}
- private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema) {
+ private static Object rewriteRecordWithNewSchema(Object oldRecord, Schema oldSchema, Schema newSchema, Map<String, String> renameCols) {
if (oldRecord == null) {
return null;
}
@@ -755,7 +769,20 @@ public class HoodieAvroUtils {
Schema.Field field = fields.get(i);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
- helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema()));
+ helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols));
+ }
+ // deal with rename
+ if (!renameCols.isEmpty() && oldSchema.getField(field.name()) == null) {
+ String fieldName = field.name();
+ for (Map.Entry<String, String> entry : renameCols.entrySet()) {
+ List<String> nameParts = Arrays.asList(entry.getKey().split("\\."));
+ List<String> namePartsOld = Arrays.asList(entry.getValue().split("\\."));
+ if (nameParts.get(nameParts.size() - 1).equals(fieldName) && oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1)) != null) {
+ // find rename
+ Schema.Field oldField = oldSchema.getField(namePartsOld.get(namePartsOld.size() - 1));
+ helper.put(i, rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), oldField.schema(), fields.get(i).schema(), renameCols));
+ }
+ }
}
}
GenericData.Record newRecord = new GenericData.Record(newSchema);
@@ -778,7 +805,7 @@ public class HoodieAvroUtils {
Collection array = (Collection)oldRecord;
List<Object> newArray = new ArrayList();
for (Object element : array) {
- newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType()));
+ newArray.add(rewriteRecordWithNewSchema(element, oldSchema.getElementType(), newSchema.getElementType(), renameCols));
}
return newArray;
case MAP:
@@ -788,11 +815,11 @@ public class HoodieAvroUtils {
Map<Object, Object> map = (Map<Object, Object>) oldRecord;
Map<Object, Object> newMap = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
- newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType()));
+ newMap.put(entry.getKey(), rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), newSchema.getValueType(), renameCols));
}
return newMap;
case UNION:
- return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+ return rewriteRecordWithNewSchema(oldRecord, getActualSchemaFromUnion(oldSchema, oldRecord), getActualSchemaFromUnion(newSchema, oldRecord), renameCols);
default:
return rewritePrimaryType(oldRecord, oldSchema, newSchema);
}
@@ -970,9 +997,10 @@ public class HoodieAvroUtils {
*
* @param oldRecords oldRecords to be rewrite
* @param newSchema newSchema used to rewrite oldRecord
+ * @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return a iterator of rewrote GeneriRcords
*/
- public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema) {
+ public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols) {
if (oldRecords == null || newSchema == null) {
return Collections.emptyIterator();
}
@@ -984,7 +1012,7 @@ public class HoodieAvroUtils {
@Override
public GenericRecord next() {
- return rewriteRecordWithNewSchema(oldRecords.next(), newSchema);
+ return rewriteRecordWithNewSchema(oldRecords.next(), newSchema, renameCols);
}
};
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 9e56083b26..9687136444 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -58,6 +58,7 @@ import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Deque;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -379,7 +380,7 @@ public abstract class AbstractHoodieLogRecordReader {
Option<Schema> schemaOption = getMergedSchema(dataBlock);
while (recordIterator.hasNext()) {
IndexedRecord currentRecord = recordIterator.next();
- IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : currentRecord;
+ IndexedRecord record = schemaOption.isPresent() ? HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), new HashMap<>()) : currentRecord;
processNextRecord(createHoodieRecord(record, this.hoodieTableMetaClient.getTableConfig(), this.payloadClassFQN,
this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName));
totalLogRecords.incrementAndGet();
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
index 0d93ab170b..bcea9b957b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
@@ -48,6 +48,25 @@ public class InternalSchemaMerger {
// we can pass decimalType to reWriteRecordWithNewSchema directly, everything is ok.
private boolean useColumnTypeFromFileSchema = true;
+ // deal with rename
+ // Whether to use column name from file schema to read files when we find some column name has changed.
+ // spark parquetReader need the original column name to read data, otherwise the parquetReader will read nothing.
+ // eg: current column name is colOldName, now we rename it to colNewName,
+ // we should not pass colNewName to parquetReader, we must pass colOldName to it; when we read out the data.
+ // for log reader
+ // since our reWriteRecordWithNewSchema function support rewrite directly, so we no need this parameter
+ // eg: current column name is colOldName, now we rename it to colNewName,
+ // we can pass colNewName to reWriteRecordWithNewSchema directly, everything is ok.
+ private boolean useColNameFromFileSchema = true;
+
+ public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) {
+ this.fileSchema = fileSchema;
+ this.querySchema = querySchema;
+ this.ignoreRequiredAttribute = ignoreRequiredAttribute;
+ this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
+ this.useColNameFromFileSchema = useColNameFromFileSchema;
+ }
+
public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema querySchema, boolean ignoreRequiredAttribute, boolean useColumnTypeFromFileSchema) {
this.fileSchema = fileSchema;
this.querySchema = querySchema;
@@ -131,12 +150,15 @@ public class InternalSchemaMerger {
private Types.Field dealWithRename(int fieldId, Type newType, Types.Field oldField) {
Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
String nameFromFileSchema = fieldFromFileSchema.name();
+ String nameFromQuerySchema = querySchema.findField(fieldId).name();
Type typeFromFileSchema = fieldFromFileSchema.type();
// Current design mechanism guarantees nestedType change is not allowed, so no need to consider.
if (newType.isNestedType()) {
- return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, newType, oldField.doc());
+ return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+ useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, newType, oldField.doc());
} else {
- return Types.Field.get(oldField.fieldId(), oldField.isOptional(), nameFromFileSchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
+ return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+ useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, useColumnTypeFromFileSchema ? typeFromFileSchema : newType, oldField.doc());
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
index 3c0877f6f5..a784b409b8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
@@ -267,4 +267,20 @@ public class InternalSchemaUtils {
}
return result;
}
+
+ /**
+ * Try to find all renamed cols between oldSchema and newSchema.
+ *
+ * @param oldSchema oldSchema
+ * @param newSchema newSchema which modified from oldSchema
+ * @return renameCols Map. (k, v) -> (colNameFromNewSchema, colNameFromOldSchema)
+ */
+ public static Map<String, String> collectRenameCols(InternalSchema oldSchema, InternalSchema newSchema) {
+ List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
+ return colNamesFromWriteSchema.stream().filter(f -> {
+ int filedIdFromWriteSchema = oldSchema.findIdByName(f);
+ // try to find the cols which has the same id, but have different colName;
+ return newSchema.getAllIds().contains(filedIdFromWriteSchema) && !newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);
+ }).collect(Collectors.toMap(e -> newSchema.findfullName(oldSchema.findIdByName(e)), e -> e));
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
index d116697b8d..3850ef07b9 100644
--- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java
@@ -284,7 +284,7 @@ public class TestAvroSchemaEvolutionUtils {
.updateColumnType("col6", Types.StringType.get());
InternalSchema newSchema = SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, avroSchema.getName());
- GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+ GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newRecord), true);
}
@@ -349,7 +349,7 @@ public class TestAvroSchemaEvolutionUtils {
);
Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, schema.getName());
- GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+ GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new HashMap<>());
// test the correctly of rewrite
Assertions.assertEquals(GenericData.get().validate(newAvroSchema, newAvroRecord), true);
}