You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/05/28 00:19:04 UTC

[hudi] 21/40: [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema (#1427)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f9c3b078966fad1843554853b6179c04cae1d7f8
Author: Pratyaksh Sharma <pr...@gmail.com>
AuthorDate: Mon Apr 13 06:25:26 2020 +0530

    [HUDI-727]: Copy default values of fields if not present when rewriting incoming record with new schema (#1427)
---
 .../apache/hudi/common/util/HoodieAvroUtils.java   | 43 +++++++++++------
 .../hudi/common/util/TestHoodieAvroUtils.java      | 54 +++++++++++++++++++++-
 .../realtime/AbstractRealtimeRecordReader.java     |  4 +-
 3 files changed, 84 insertions(+), 17 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java
index 02e48e3..32ea71e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieAvroUtils.java
@@ -32,7 +32,6 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.node.NullNode;
 
 import java.io.ByteArrayInputStream;
@@ -44,6 +43,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -104,15 +104,15 @@ public class HoodieAvroUtils {
     List<Schema.Field> parentFields = new ArrayList<>();
 
     Schema.Field commitTimeField =
-        new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+        new Schema.Field(HoodieRecord.COMMIT_TIME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
     Schema.Field commitSeqnoField =
-        new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+        new Schema.Field(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
     Schema.Field recordKeyField =
-        new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+        new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
     Schema.Field partitionPathField =
-        new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+        new Schema.Field(HoodieRecord.PARTITION_PATH_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
     Schema.Field fileNameField =
-        new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());
+        new Schema.Field(HoodieRecord.FILENAME_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", (Object) null);
 
     parentFields.add(commitTimeField);
     parentFields.add(commitSeqnoField);
@@ -121,8 +121,8 @@ public class HoodieAvroUtils {
     parentFields.add(fileNameField);
     for (Schema.Field field : schema.getFields()) {
       if (!isMetadataField(field.name())) {
-        Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue());
-        for (Map.Entry<String, JsonNode> prop : field.getJsonProps().entrySet()) {
+        Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
+        for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) {
           newField.addProp(prop.getKey(), prop.getValue());
         }
         parentFields.add(newField);
@@ -191,7 +191,7 @@ public class HoodieAvroUtils {
    * schema.
    */
   public static GenericRecord rewriteRecord(GenericRecord record, Schema newSchema) {
-    return rewrite(record, record.getSchema(), newSchema);
+    return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), newSchema), newSchema);
   }
 
   /**
@@ -199,13 +199,17 @@ public class HoodieAvroUtils {
    * schema.
    */
   public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
-    return rewrite(record, newSchema, newSchema);
+    return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
   }
 
-  private static GenericRecord rewrite(GenericRecord record, Schema schemaWithFields, Schema newSchema) {
+  private static GenericRecord rewrite(GenericRecord record, LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
     GenericRecord newRecord = new GenericData.Record(newSchema);
-    for (Schema.Field f : schemaWithFields.getFields()) {
-      newRecord.put(f.name(), record.get(f.name()));
+    for (Schema.Field f : fieldsToWrite) {
+      if (record.get(f.name()) == null) {
+        newRecord.put(f.name(), f.defaultVal());
+      } else {
+        newRecord.put(f.name(), record.get(f.name()));
+      }
     }
     if (!GenericData.get().validate(newSchema, newRecord)) {
       throw new SchemaCompatabilityException(
@@ -214,6 +218,19 @@ public class HoodieAvroUtils {
     return newRecord;
   }
 
+  /**
+   * Generates a super set of fields from both old and new schema.
+   */
+  private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema oldSchema, Schema newSchema) {
+    LinkedHashSet<Field> allFields = new LinkedHashSet<>(oldSchema.getFields());
+    for (Schema.Field f : newSchema.getFields()) {
+      if (!allFields.contains(f) && !isMetadataField(f.name())) {
+        allFields.add(f);
+      }
+    }
+    return allFields;
+  }
+
   public static byte[] compress(String text) {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java
index 2f925b8..b2c13f8 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieAvroUtils.java
@@ -19,7 +19,8 @@
 package org.apache.hudi.common.util;
 
 import org.apache.avro.Schema;
-import org.codehaus.jackson.JsonNode;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,11 +31,25 @@ import java.util.Map;
  */
 public class TestHoodieAvroUtils {
 
+  private static String EVOLVED_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec1\",\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+      + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+      + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
+      + "{\"name\": \"new_col1\", \"type\": \"string\", \"default\": \"dummy_val\"},"
+      + "{\"name\": \"new_col2\", \"type\": [\"int\", \"null\"]}]}";
+
   private static String EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"testrec\",\"fields\": [ "
       + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
       + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
       + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"}]}";
 
+
+  private static String SCHEMA_WITH_METADATA_FIELD = "{\"type\": \"record\",\"name\": \"testrec2\",\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
+      + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+      + "{\"name\": \"pii_col\", \"type\": \"string\", \"column_category\": \"user_profile\"},"
+      + "{\"name\": \"_hoodie_commit_time\", \"type\": [\"null\", \"string\"]}]}";
+
   @Test
   public void testPropsPresent() {
     Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(EXAMPLE_SCHEMA));
@@ -45,7 +60,7 @@ public class TestHoodieAvroUtils {
       }
 
       Assert.assertNotNull("field name is null", field.name());
-      Map<String, JsonNode> props = field.getJsonProps();
+      Map<String, Object> props = field.getObjectProps();
       Assert.assertNotNull("The property is null", props);
 
       if (field.name().equals("pii_col")) {
@@ -57,4 +72,39 @@ public class TestHoodieAvroUtils {
     }
     Assert.assertTrue("column pii_col doesn't show up", piiPresent);
   }
+
+  @Test
+  public void testDefaultValue() {
+    GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EVOLVED_SCHEMA));
+    rec.put("_row_key", "key1");
+    rec.put("non_pii_col", "val1");
+    rec.put("pii_col", "val2");
+    rec.put("timestamp", 3.5);
+    GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
+    Assert.assertEquals(rec1.get("new_col1"), "dummy_val");
+    Assert.assertNull(rec1.get("new_col2"));
+  }
+
+  @Test
+  public void testDefaultValueWithSchemaEvolution() {
+    GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
+    rec.put("_row_key", "key1");
+    rec.put("non_pii_col", "val1");
+    rec.put("pii_col", "val2");
+    rec.put("timestamp", 3.5);
+    GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(EVOLVED_SCHEMA));
+    Assert.assertEquals(rec1.get("new_col1"), "dummy_val");
+    Assert.assertNull(rec1.get("new_col2"));
+  }
+
+  @Test
+  public void testMetadataField() {
+    GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
+    rec.put("_row_key", "key1");
+    rec.put("non_pii_col", "val1");
+    rec.put("pii_col", "val2");
+    rec.put("timestamp", 3.5);
+    GenericRecord rec1 = HoodieAvroUtils.rewriteRecord(rec, new Schema.Parser().parse(SCHEMA_WITH_METADATA_FIELD));
+    Assert.assertNull(rec1.get("_hoodie_commit_time"));
+  }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index df635d6..782a6fc 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -212,7 +212,7 @@ public abstract class AbstractRealtimeRecordReader {
         throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
             + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
       } else {
-        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
+        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
       }
     }
 
@@ -367,7 +367,7 @@ public abstract class AbstractRealtimeRecordReader {
       Field field = schemaFieldsMap.get(columnName.toLowerCase());
 
       if (field != null) {
-        hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue()));
+        hiveSchemaFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
       } else {
         // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
         // They will get skipped as they won't be found in the original schema.