You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/05/23 22:19:41 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #2309: [HUDI-1441] - HoodieAvroUtils - rewrite() is not handling evolution o…

nsivabalan commented on a change in pull request #2309:
URL: https://github.com/apache/hudi/pull/2309#discussion_r637608079



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -308,17 +309,81 @@ public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord r
     return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
   }
 
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
+    } else {
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   * <pre>
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                       field_24 : Integer
+   *                                     field3: Integer
+   * </pre>
+   * <p>
+   *  When a nested record has changed/evolved, newRecord.put(field2, oldRecord.get(field2)), is not sufficient.
+   *  Requires a deep-copy/rewrite of the evolved field.
+   */
+  private static Object rewriteEvolvedFields(Object datum, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(datum instanceof GenericRecord)) {
+          return datum;
+        }
+        GenericRecord record = (GenericRecord) datum;
+        // if schema of the record being rewritten does not match
+        // with the new schema, some nested records with schema change
+        // will require rewrite.
+        if (!record.getSchema().equals(newSchema)) {
+          GenericRecord newRecord = new GenericData.Record(newSchema);
+          for (Schema.Field f : newSchema.getFields()) {
+            if (record.get(f.name()) == null) {
+              setDefaultVal(newRecord, f);
+            } else {
+              newRecord.put(f.name(), rewriteEvolvedFields(record.get(f.name()), f.schema()));
+            }
+          }
+          return newRecord;
+        }
+        return datum;
+      case UNION:
+        Integer idx = (newSchema.getTypes().get(0).getType() == Schema.Type.NULL) ? 1 : 0;
+        return rewriteEvolvedFields(datum, newSchema.getTypes().get(idx));
+      case ARRAY:
+        List<Object> arrayValue = (List)datum;
+        List<Object> arrayCopy = new GenericData.Array<Object>(
+            arrayValue.size(), newSchema);
+        for (Object obj : arrayValue) {
+          arrayCopy.add(rewriteEvolvedFields(obj, newSchema.getElementType()));
+        }
+        return arrayCopy;
+      case MAP:
+        Map<Object,Object> map = (Map<Object,Object>)datum;
+        Map<Object, Object> mapCopy = new HashMap<>(map.size());
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          mapCopy.put(entry.getKey(), rewriteEvolvedFields(entry.getValue(), newSchema.getValueType()));
+        }
+        return mapCopy;
+      default:

Review comment:
       I am slowly gaining knowledge in schema evolution. might be a noob question. Apart from RECORD datatype, how else other datatypes could evolve. for eg, an array in old has to be array in new schema right. Incase of RECORD, I understand there could be more fields, and hence we need a deep copy. what I am trying to ask is, for union, array, map data types, can we just fetch old value and add to new record rather than doing a deep copy? Can you help clarify. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -308,17 +309,79 @@ public static GenericRecord rewriteRecordWithOnlyNewSchemaFields(GenericRecord r
     return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), newSchema);
   }
 
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
+    } else {
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                       field_24 : Integer
+   *                                     field3: Integer
+   *
+   *  When a nested record has changed/evolved, newRecord.put(field2, oldRecord.get(field2)), is not sufficient.
+   *  Requires a deep-copy/rewrite of the evolved field.
+   */
+  private static Object rewriteEvolvedFields(Object datum, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(datum instanceof GenericRecord)) {
+          return datum;
+        }
+        GenericRecord record = (GenericRecord) datum;
+        // if schema of the record being rewritten does not match
+        // with the new schema, some nested records with schema change
+        // will require rewrite.
+        if (!record.getSchema().equals(newSchema)) {
+          GenericRecord newRecord = new GenericData.Record(newSchema);
+          for (Schema.Field f : newSchema.getFields()) {
+            if (record.get(f.name()) == null) {
+              setDefaultVal(newRecord, f);
+            } else {
+              newRecord.put(f.name(), rewriteEvolvedFields(record.get(f.name()), f.schema()));
+            }
+          }
+          return newRecord;
+        }
+        return datum;
+      case UNION:
+        Integer idx = (newSchema.getTypes().get(0).getType() == Schema.Type.NULL) ? 1 : 0;
+        return rewriteEvolvedFields(datum, newSchema.getTypes().get(idx));

Review comment:
       @nbalajee : let me know if this is feasible. Is NULL mandatory in any UNION schema? I mean, can there be a UNION schema w/o NULL in it? if yes, this would fail in my understanding. 

##########
File path: hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
##########
@@ -207,4 +208,82 @@ public void testAddingAndRemovingMetadataFields() {
     Schema schemaWithoutMetaCols = HoodieAvroUtils.removeMetadataFields(schemaWithMetaCols);
     assertEquals(schemaWithoutMetaCols.getFields().size(), NUM_FIELDS_IN_EXAMPLE_SCHEMA);
   }
+
+  @Test
+  public void testRewriteToEvolvedNestedRecord() throws Exception {
+    // schema definition for inner record
+    Schema nestedSchema = SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id").endRecord();
+    Schema evolvedNestedSchema = SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id")
+        .optionalString("color_name").endRecord();
+
+    // schema definition for outer record
+    Schema recordSchema = SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp")
+        .requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(nestedSchema)
+        .noDefault().requiredString("pii_col").endRecord();
+    Schema evolvedRecordSchema = SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp")
+        .requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(evolvedNestedSchema)
+        .noDefault().requiredString("pii_col").endRecord();
+
+    // populate inner record, with fewer fields
+    GenericRecord nestedRec = new GenericData.Record(nestedSchema);
+    nestedRec.put("color_id", 55.5);
+
+    // populate outer record
+    GenericRecord rec = new GenericData.Record(recordSchema);
+    rec.put("timestamp", 3.5);
+    rec.put("_row_key", "key1");
+    rec.put("non_pii_col", "val1");
+    rec.put("color_rec", nestedRec);
+    rec.put("pii_col", "val2");
+
+    // rewrite record with less number of fields into an evolved record (with optional fields added).
+    try {
+      GenericRecord newRecord = HoodieAvroUtils.rewriteRecordWithOnlyNewSchemaFields(rec, evolvedRecordSchema);
+      assertEquals("val2", newRecord.get("pii_col"));
+      assertEquals(null, ((GenericRecord)newRecord.get("color_rec")).get("color_name"));
+    } catch (Exception e) {
+      e.printStackTrace();
+      assertTrue(false, "Failed to rewrite Record");
+    }
+
+  }
+
+  @Test
+  public void testRewriteToShorterRecord() throws Exception {

Review comment:
       I thought from the java docs of HoodieAvroUtils.rewriteRecord(GenericRecord oldRecord, Schema newSchema), rewrite can happen from old schema to new schema and not other way round. Can you help me understand why we allow backwards incompatible rewrite here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org