You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2021/01/14 20:48:10 UTC

[hudi] branch master updated: [HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (#2424)

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 749f657  [HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (#2424)
749f657 is described below

commit 749f6578561cbf065c7f74ab51b1c01881a1bd97
Author: n3nash <na...@uber.com>
AuthorDate: Thu Jan 14 12:47:50 2021 -0800

    [HUDI-1509]: Reverting LinkedHashSet changes to combine fields from oldSchema and newSchema in favor of using only new schema for record rewriting (#2424)
---
 .../hudi/client/TestTableSchemaEvolution.java      |  2 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 66 +++++++++++-----------
 .../realtime/RealtimeCompactedRecordReader.java    |  2 +-
 .../reader/DFSHoodieDatasetInputReader.java        |  2 +-
 4 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 708a647..ecf38d8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -481,7 +481,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
       GenericRecord payload;
       try {
         payload = (GenericRecord)r.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get();
-        GenericRecord newPayload = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(payload, newSchema);
+        GenericRecord newPayload = HoodieAvroUtils.rewriteRecord(payload, newSchema);
         return new HoodieRecord(key, new RawTripTestPayload(newPayload.toString(), key.getRecordKey(), key.getPartitionPath(), schemaStr));
       } catch (IOException e) {
         throw new RuntimeException("Conversion to new schema failed");
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 37a49ea..ec19fe3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.avro;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -58,7 +59,6 @@ import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -293,52 +293,52 @@ public class HoodieAvroUtils {
   }
 
   /**
-   * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the old
-   * schema.
-   */
-  public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
-    return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema);
-  }
-
-  /**
    * Given a avro record with a given schema, rewrites it into the new schema while setting fields only from the new
    * schema.
+   * NOTE: Here, the assumption is that you cannot go from an evolved schema (schema with (N) fields)
+   * to an older schema (schema with (N-1) fields). All fields present in the older record schema MUST be present in the
+   * new schema and the default/existing values are carried over.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new schema or set default values for all fields of this
+   * transformed schema
+   * c) For SpecificRecord, hoodie_metadata_fields have a special treatment. This is done because for code generated
+   * avro classes (HoodieMetadataRecord), the avro record is a SpecificBaseRecord type instead of a GenericRecord.
+   * SpecificBaseRecord throws null pointer exception for record.get(name) if name is not present in the schema of the
+   * record (which happens when converting a SpecificBaseRecord without hoodie_metadata_fields to a new record with it).
+   * In this case, we do NOT set the defaults for the hoodie_metadata_fields explicitly, instead, the new record assumes
+   * the default defined in the avro schema itself.
+   * TODO: See if we can always pass GenericRecord instead of SpecificBaseRecord in some cases.
    */
-  public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
-    return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
-  }
-
-  private static GenericRecord rewrite(GenericRecord record, LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
+  public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema newSchema) {
     GenericRecord newRecord = new GenericData.Record(newSchema);
-    for (Schema.Field f : fieldsToWrite) {
-      if (record.get(f.name()) == null) {
-        if (f.defaultVal() instanceof JsonProperties.Null) {
-          newRecord.put(f.name(), null);
-        } else {
-          newRecord.put(f.name(), f.defaultVal());
-        }
-      } else {
-        newRecord.put(f.name(), record.get(f.name()));
+    boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+    for (Schema.Field f : newSchema.getFields()) {
+      if (!isSpecificRecord) {
+        copyOldValueOrSetDefault(oldRecord, newRecord, f);
+      } else if (!isMetadataField(f.name())) {
+        copyOldValueOrSetDefault(oldRecord, newRecord, f);
       }
     }
     if (!GenericData.get().validate(newSchema, newRecord)) {
       throw new SchemaCompatibilityException(
-          "Unable to validate the rewritten record " + record + " against schema " + newSchema);
+          "Unable to validate the rewritten record " + oldRecord + " against schema " + newSchema);
     }
     return newRecord;
   }
 
-  /**
-   * Generates a super set of fields from both old and new schema.
-   */
-  private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) {
-    LinkedHashSet<Field> allFields = new LinkedHashSet<>(oldSchema.getFields());
-    for (Schema.Field f : newSchema.getFields()) {
-      if (!allFields.contains(f) && !isMetadataField(f.name())) {
-        allFields.add(f);
+  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field f) {
+    // cache the result of oldRecord.get() to save CPU expensive hash lookup
+    Object fieldValue = oldRecord.get(f.name());
+    if (fieldValue == null) {
+      if (f.defaultVal() instanceof JsonProperties.Null) {
+        newRecord.put(f.name(), null);
+      } else {
+        newRecord.put(f.name(), f.defaultVal());
       }
+    } else {
+      newRecord.put(f.name(), fieldValue);
     }
-    return allFields;
   }
 
   public static byte[] compress(String text) {
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index a139997..a98a230 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -109,7 +109,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
         if (usesCustomPayload) {
           // If using a custom payload, return only the projection fields. The readerSchema is a schema derived from
           // the writerSchema with only the projection fields
-          recordToReturn = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec.get(), getReaderSchema());
+          recordToReturn = HoodieAvroUtils.rewriteRecord(rec.get(), getReaderSchema());
         }
         // we assume, a later safe record in the log, is newer than what we have in the map &
         // replace it. Since we want to return an arrayWritable which is the same length as the elements in the latest
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 4592ae8..5aa9797 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -197,7 +197,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
   private JavaRDD<GenericRecord> projectSchema(JavaRDD<GenericRecord> updates) {
     // The records read from the hoodie dataset have the hoodie record fields, rewrite the record to eliminate them
     return updates
-        .map(r -> HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(r, new Schema.Parser().parse(schemaStr)));
+        .map(r -> HoodieAvroUtils.rewriteRecord(r, new Schema.Parser().parse(schemaStr)));
   }
 
   private JavaRDD<GenericRecord> generateUpdates(Map<String, Integer> adjustedPartitionToFileIdCountMap,