You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/25 03:49:18 UTC
[hudi] branch master updated: [HUDI-3678] Fix record rewrite of create handle when 'preserveMetadata' is true (#5088)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8896864 [HUDI-3678] Fix record rewrite of create handle when 'preserveMetadata' is true (#5088)
8896864 is described below
commit 8896864d7b8e8c1a9bf0e2a05353f70a1fabdf22
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Mar 25 11:48:50 2022 +0800
[HUDI-3678] Fix record rewrite of create handle when 'preserveMetadata' is true (#5088)
---
.../org/apache/hudi/io/HoodieCreateHandle.java | 22 ++++++++++------------
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 20 ++++++++------------
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 4 ++--
.../metadata/HoodieBackedTableMetadataWriter.java | 6 +++++-
.../hudi/client/TestUpdateSchemaEvolution.java | 2 ++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 18 ++++--------------
6 files changed, 31 insertions(+), 41 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 3e7e0b1..0bc3491 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -59,7 +59,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
protected long recordsDeleted = 0;
private Map<String, HoodieRecord<T>> recordMap;
private boolean useWriterSchema = false;
- private boolean preserveHoodieMetadata = false;
+ private final boolean preserveMetadata;
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
@@ -69,9 +69,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
- boolean preserveHoodieMetadata) {
+ boolean preserveMetadata) {
this(config, instantTime, hoodieTable, partitionPath, fileId, Option.empty(),
- taskContextSupplier, preserveHoodieMetadata);
+ taskContextSupplier, preserveMetadata);
}
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
@@ -82,10 +82,10 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Option<Schema> overriddenSchema,
- TaskContextSupplier taskContextSupplier, boolean preserveHoodieMetadata) {
+ TaskContextSupplier taskContextSupplier, boolean preserveMetadata) {
super(config, instantTime, partitionPath, fileId, hoodieTable, overriddenSchema,
taskContextSupplier);
- this.preserveHoodieMetadata = preserveHoodieMetadata;
+ this.preserveMetadata = preserveMetadata;
writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath);
writeStatus.setStat(new HoodieWriteStat());
@@ -111,7 +111,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
TaskContextSupplier taskContextSupplier) {
- this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier);
+ this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
this.recordMap = recordMap;
this.useWriterSchema = true;
}
@@ -137,13 +137,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload, I, K, O> extends
return;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
- IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
- if (preserveHoodieMetadata) {
- // do not preserve FILENAME_METADATA_FIELD
- recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), path.getName());
- fileWriter.writeAvro(record.getRecordKey(), recordWithMetadataInSchema);
+ if (preserveMetadata) {
+ fileWriter.writeAvro(record.getRecordKey(),
+ rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
} else {
- fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
+ fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
}
// update the new location of record, so we know where to find it next
record.unseal();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index d38f66a..cbcf382 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -61,8 +61,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;
-
@SuppressWarnings("Duplicates")
/**
* Handle to merge incoming records to those in storage.
@@ -264,7 +262,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
}
}
- return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
+ return writeRecord(hoodieRecord, indexedRecord, isDelete);
}
protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
@@ -274,16 +272,16 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
return;
}
- if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
+ if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
insertRecordsWritten++;
}
}
protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord) {
- return writeRecord(hoodieRecord, indexedRecord, false, null);
+ return writeRecord(hoodieRecord, indexedRecord, false);
}
- protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) {
+ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord> indexedRecord, boolean isDelete) {
Option recordMetadata = hoodieRecord.getData().getMetadata();
if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
HoodieUpsertException failureEx = new HoodieUpsertException("mismatched partition path, record partition: "
@@ -294,13 +292,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
try {
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
- IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
- if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only incase of compaction.
- // do not preserve FILENAME_METADATA_FIELD
- recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, newFilePath.getName());
- fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
+ if (preserveMetadata && useWriterSchema) { // useWriteSchema will be true only in case of compaction.
+ fileWriter.writeAvro(hoodieRecord.getRecordKey(),
+ rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
} else {
- fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
+ fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
}
recordsWritten++;
} else {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index b7e2d6a..89babc7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -227,8 +227,8 @@ public abstract class HoodieWriteHandle<T extends HoodieRecordPayload, I, K, O>
return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
}
- protected GenericRecord rewriteRecord(GenericRecord record, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
- return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, copyOverMetaFields, fallbackRecord);
+ protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, String fileName) {
+ return HoodieAvroUtils.rewriteRecordWithMetadata(record, writeSchemaWithMetaFields, fileName);
}
public abstract List<WriteStatus> close();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 2f4bca8..ce167f7 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -259,7 +259,11 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
// we will trigger archive manually, to ensure only regular writer invokes it
- .withAutoArchive(false).build())
+ .withAutoArchive(false)
+ // by default, the HFile does not keep the metadata fields, set up as false
+ // to always use the metadata of the new record.
+ .withPreserveCommitMetadata(false)
+ .build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
.withRollbackParallelism(parallelism)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 70f5e9f..a592619 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle;
@@ -77,6 +78,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
// Create a bunch of records with an old version of schema
final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
+ config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
final HoodieSparkTable table = HoodieSparkTable.create(config, context);
final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
List<HoodieRecord> insertRecords = new ArrayList<>();
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 5cb18dc..e427422 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -382,23 +382,13 @@ public class HoodieAvroUtils {
return newRecord;
}
- public static GenericRecord rewriteRecord(GenericRecord genericRecord, Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
+ public static GenericRecord rewriteRecordWithMetadata(GenericRecord genericRecord, Schema newSchema, String fileName) {
GenericRecord newRecord = new GenericData.Record(newSchema);
- boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
for (Schema.Field f : newSchema.getFields()) {
- if (!(isSpecificRecord && isMetadataField(f.name()))) {
- copyOldValueOrSetDefault(genericRecord, newRecord, f);
- }
- if (isMetadataField(f.name()) && copyOverMetaFields) {
- // if meta field exists in primary generic record, copy over.
- if (genericRecord.getSchema().getField(f.name()) != null) {
- copyOldValueOrSetDefault(genericRecord, newRecord, f);
- } else if (fallbackRecord != null && fallbackRecord.getSchema().getField(f.name()) != null) {
- // if not, try to copy from the fallback record.
- copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
- }
- }
+ copyOldValueOrSetDefault(genericRecord, newRecord, f);
}
+ // do not preserve FILENAME_METADATA_FIELD
+ newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
"Unable to validate the rewritten record " + genericRecord + " against schema " + newSchema);