You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2021/01/14 20:48:10 UTC
[hudi] branch master updated: [HUDI-1509]: Reverting LinkedHashSet
changes to combine fields from oldSchema and newSchema in favor of using
only new schema for record rewriting (#2424)
This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 749f657 [HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (#2424)
749f657 is described below
commit 749f6578561cbf065c7f74ab51b1c01881a1bd97
Author: n3nash <na...@uber.com>
AuthorDate: Thu Jan 14 12:47:50 2021 -0800
[HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (#2424)
---
.../hudi/client/TestTableSchemaEvolution.java | 2 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 66 +++++++++++-----------
.../realtime/RealtimeCompactedRecordReader.java | 2 +-
.../reader/DFSHoodieDatasetInputReader.java | 2 +-
4 files changed, 36 insertions(+), 36 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 708a647..ecf38d8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -481,7 +481,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
GenericRecord payload;
try {
payload = (GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
- GenericRecord newPayload = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema);
+ GenericRecord newPayload = HoodieAvroUtils.rewriteRecord(payload, newSchema);
return new HoodieRecord(key, new RawTripTestPayload(newPayload.toString(), key.getRecordKey(), key.getPartitionPath(), schemaStr));
} catch (IOException e) {
throw new RuntimeException("Conversion to new schema failed");
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 37a49ea..ec19fe3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.avro;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -58,7 +59,6 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -293,52 +293,52 @@ public class HoodieAvroUtils {
}
/**
- * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old
- * schema.
- */
- public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
- return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema);
- }
-
- /**
* Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new
* schema.
+ * NOTE: Here, the assumption is that you cannot go from an evolved schema (schema with (N) fields)
+ * to an older schema (schema with (N-1) fields). All fields present in the older record schema MUST be present in the
+ * new schema and the default/existing values are carried over.
+ * This particular method does the following things :
+ * a) Create a new empty GenericRecord with the new schema.
+ * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this
+ * transformed schema
+ * c) For SpecificRecord, hoodie_metadata_fields have a special treatment. This is done because for code generated
+ * avro classes (HoodieMetadataRecord), the avro record is a SpecificBaseRecord type instead of a GenericRecord.
+ * SpecificBaseRecord throws null pointer exception for record.get(name) if name is not present in the schema of the
+ * record (which happens when converting a SpecificBaseRecord without hoodie_metadata_fields to a new record with it).
+ * In this case, we do NOT set the defaults for the hoodie_metadata_fields explicitly, instead, the new record assumes
+ * the default defined in the avro schema itself.
+ * TODO: See if we can always pass GenericRecord instead of SpecificBaseRecord in some cases.
*/
- public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
- return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
- }
-
- private static GenericRecord rewrite(GenericRecord record, LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
+ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSchema) {
GenericRecord newRecord = new GenericData.Record(newSchema);
- for (Schema.Field f : fieldsToWrite) {
- if (record.get(f.name()) == null) {
- if (f.defaultVal() instanceof JsonProperties.Null) {
- newRecord.put(f.name(), null);
- } else {
- newRecord.put(f.name(), f.defaultVal());
- }
- } else {
- newRecord.put(f.name(), record.get(f.name()));
+ boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+ for (Schema.Field f : newSchema.getFields()) {
+ if (!isSpecificRecord) {
+ copyOldValueOrSetDefault(oldRecord, newRecord, f);
+ } else if (!isMetadataField(f.name())) {
+ copyOldValueOrSetDefault(oldRecord, newRecord, f);
}
}
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
- "Unable to validate the rewritten record " + record + " against schema " + newSchema);
+ "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
}
return newRecord;
}
- /**
- * Generates a super set of fields from both old and new schema.
- */
- private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) {
- LinkedHashSet<Field> allFields = new LinkedHashSet<>(oldSchema.getFields());
- for (Schema.Field f : newSchema.getFields()) {
- if (!allFields.contains(f) && !isMetadataField(f.name())) {
- allFields.add(f);
+ private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) {
+ // cache the result of oldRecord.get() to save CPU expensive hash lookup
+ Object fieldValue = oldRecord.get(f.name());
+ if (fieldValue == null) {
+ if (f.defaultVal() instanceof JsonProperties.Null) {
+ newRecord.put(f.name(), null);
+ } else {
+ newRecord.put(f.name(), f.defaultVal());
}
+ } else {
+ newRecord.put(f.name(), fieldValue);
}
- return allFields;
}
public static byte[] compress(String text) {
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index a139997..a98a230 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -109,7 +109,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
if (usesCustomPayload) {
// If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
// the writerSchema with only the projection fields
- recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
+ recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
}
// we assume, a later safe record in the log, is newer than what we have in the map &
// replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 4592ae8..5aa9797 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -197,7 +197,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
private JavaRDD<GenericRecord> projectSchema(JavaRDD<GenericRecord> updates) {
// The records read from the hoodie dataset have the hoodie record fields, rewrite the record to eliminate them
return updates
- .map(r -> HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(r, new Schema.Parser().parse(schemaStr)));
+ .map(r -> HoodieAvroUtils.rewriteRecord(r, new Schema.Parser().parse(schemaStr)));
}
private JavaRDD<GenericRecord> generateUpdates(Map<String, Integer> adjustedPartitionToFileIdCountMap,