You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/25 17:08:24 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-757] Adding utility functions to support decoration of Avro Generic Records[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ec902e  [GOBBLIN-757] Adding utility functions to support decoration of Avro Generic Records[]
3ec902e is described below

commit 3ec902e93ebe60dfce8a3b02dc32482f241c84dd
Author: Shirshanka Das <sd...@linkedin.com>
AuthorDate: Thu Apr 25 10:08:17 2019 -0700

    [GOBBLIN-757] Adding utility functions to support decoration of Avro Generic Records[]
    
    …eneric Records
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [X] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        -
    https://issues.apache.org/jira/browse/GOBBLIN-757
    
    ### Description
    - [X] Here are some details about my PR, including
    screenshots (if applicable):
      Adds two methods to AvroUtils that work on schemas
    and records.
    
    ### Tests
    - [X] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
     additional tests in AvroUtilsTest
    
    ### Commits
    - [X] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Closes #2621 from shirshanka/avro-decorate
---
 .../java/org/apache/gobblin/util/AvroUtils.java    |  50 +++++-
 .../org/apache/gobblin/util/AvroUtilsTest.java     | 175 ++++++++++++++++++++-
 2 files changed, 223 insertions(+), 2 deletions(-)

diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 242dcd8..8f2c653 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
@@ -37,6 +38,7 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -75,6 +77,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 
+import javax.annotation.Nonnull;
 
 /**
  * A Utils class for dealing with Avro objects
@@ -108,10 +111,15 @@ public class AvroUtils {
 
   public static List<Field> deepCopySchemaFields(Schema readerSchema) {
     return readerSchema.getFields().stream()
-        .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()))
+        .map(field -> {
+          Field f = new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order());
+          field.getProps().forEach((key, value) -> f.addProp(key, value));
+          return f;
+        })
         .collect(Collectors.toList());
   }
 
+
   public static class AvroPathFilter implements PathFilter {
     @Override
     public boolean accept(Path path) {
@@ -834,4 +842,44 @@ public class AvroUtils {
     GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
     return reader.read(null, decoder);
   }
+
+
+  /**
+   * Decorate the {@link Schema} for a record with additional {@link Field}s.
+   * @param inputSchema: must be a {@link Record} schema.
+   * @return the decorated Schema. Fields are appended to the inputSchema.
+   */
+  public static Schema decorateRecordSchema(Schema inputSchema, @Nonnull List<Field> fieldList) {
+    Preconditions.checkState(inputSchema.getType().equals(Type.RECORD));
+    List<Field> outputFields = deepCopySchemaFields(inputSchema);
+    List<Field> newOutputFields = Stream.concat(outputFields.stream(), fieldList.stream()).collect(Collectors.toList());
+
+    Schema outputSchema = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(),
+            inputSchema.getNamespace(), inputSchema.isError());
+    outputSchema.setFields(newOutputFields);
+    copyProperties(inputSchema, outputSchema);
+    return outputSchema;
+  }
+
+  /**
+   * Decorate a {@link GenericRecord} with additional fields and make it conform to an extended Schema
+   * It is the caller's responsibility to ensure that the outputSchema is the merge of the inputRecord's schema
+   * and the additional fields. The method does not check this for performance reasons, because it is expected to be called in the
+   * critical path of processing a record.
+   * Use {@link AvroUtils#decorateRecordSchema(Schema, List)} to generate such a Schema before calling this method.
+   * @param inputRecord: record with data to be copied into the output record
+   * @param fieldMap: values can be primitive types or GenericRecords if nested
+   * @param outputSchema: the schema that the decoratedRecord will conform to
+   * @return an outputRecord that contains a union of the fields in the inputRecord and the field-values in the fieldMap
+   */
+  public static GenericRecord decorateRecord(GenericRecord inputRecord, @Nonnull Map<String, Object> fieldMap,
+          Schema outputSchema) {
+    GenericRecord outputRecord = new GenericData.Record(outputSchema);
+    inputRecord.getSchema().getFields().forEach(
+            f -> outputRecord.put(f.name(), inputRecord.get(f.name()))
+    );
+    fieldMap.forEach((key, value) -> outputRecord.put(key, value));
+    return outputRecord;
+  }
+
 }
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 627fc60..94ce650 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.gobblin.util;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -31,11 +34,18 @@ import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -310,4 +320,167 @@ public class AvroUtilsTest {
     String stringValue = AvroUtils.getFieldValue(record, "union.string").get().toString();
     Assert.assertEquals(stringValue, "testString");
   }
+
+
+  @Test
+  public void testDecorateSchemaWithSingleField() {
+
+    Schema inputRecord = SchemaBuilder.record("test").fields().requiredInt("numeric1")
+            .requiredString("string1").endRecord();
+    Schema fieldSchema = SchemaBuilder.builder().intType();
+    Schema.Field field = new Schema.Field("newField", fieldSchema, "",null);
+    Schema outputRecord = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(field));
+    checkFieldsMatch(inputRecord, outputRecord);
+    Assert.assertNotNull(outputRecord.getField("newField"));
+    Assert.assertEquals(outputRecord.getField("newField").schema(), fieldSchema);
+  }
+
+  private void checkFieldsMatch(Schema inputRecord, Schema outputRecord) {
+    inputRecord.getFields().forEach(f -> {
+      Schema.Field outField = outputRecord.getField(f.name());
+      Assert.assertEquals(f, outField);
+    });
+  }
+
+  @Test
+  public void testDecorateSchemaWithStringProperties() {
+
+    Schema inputRecord = SchemaBuilder.record("test").fields()
+            .name("integer1")
+              .prop("innerProp", "innerVal")
+              .type().intBuilder().endInt().noDefault()
+            .requiredString("string1")
+            .endRecord();
+    inputRecord.addProp("topLevelProp", "topLevelVal");
+    Schema.Field additionalField = getTestInnerRecordField();
+    Schema outputSchema = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(additionalField));
+    checkFieldsMatch(inputRecord, outputSchema);
+    Assert.assertEquals(outputSchema.getProp("topLevelProp"), "topLevelVal");
+    Assert.assertEquals(outputSchema.getField("integer1").getProp("innerProp"), "innerVal");
+  }
+
+  @Test
+  public void testDecorateSchemaWithObjectProperties() throws IOException {
+
+    String customPropertyString = "{\"custom\": {\"prop1\": \"val1\"}}";
+    JsonNode customPropertyValue = new ObjectMapper().readTree(customPropertyString);
+
+    Schema inputRecord = SchemaBuilder.record("test").fields()
+            .name("integer1")
+            .prop("innerProp", "innerVal")
+            .type().intBuilder().endInt().noDefault()
+            .requiredString("string1")
+            .endRecord();
+    inputRecord.addProp("topLevelProp", customPropertyValue);
+    Schema.Field additionalField = getTestInnerRecordField();
+    Schema outputSchema = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(additionalField));
+    checkFieldsMatch(inputRecord, outputSchema);
+    Assert.assertEquals(outputSchema.getProp("topLevelProp"), inputRecord.getProp("topLevelProp"));
+    Assert.assertEquals(outputSchema.getField("integer1").getProp("innerProp"), "innerVal");
+  }
+
+
+  private Schema.Field getTestInnerRecordField() {
+    Schema fieldSchema = SchemaBuilder.record("innerRecord")
+            .fields().requiredInt("innerInt").requiredString("innerString")
+            .endRecord();
+    Schema.Field field = new Schema.Field("innerRecord", fieldSchema, "",null);
+    return field;
+  }
+
+
+  @Test
+  public void testDecorateSchemaWithSingleRecord() {
+    Schema inputRecord = SchemaBuilder.record("test").fields().requiredInt("numeric1")
+            .requiredString("string1").endRecord();
+    Schema fieldSchema = SchemaBuilder.record("innerRecord")
+            .fields().requiredInt("innerInt").requiredString("innerString")
+            .endRecord();
+    Schema.Field field = new Schema.Field("innerRecord", fieldSchema, "",null);
+    Schema outputRecord = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(field));
+    checkFieldsMatch(inputRecord, outputRecord);
+    Assert.assertNotNull(outputRecord.getField("innerRecord"));
+    Assert.assertEquals(outputRecord.getField("innerRecord").schema(), fieldSchema);
+  }
+
+
+  @Test
+  public void testDecorateRecordWithPrimitiveField() {
+    Schema inputRecordSchema = SchemaBuilder.record("test").fields()
+            .name("integer1")
+            .prop("innerProp", "innerVal")
+            .type().intBuilder().endInt().noDefault()
+            .requiredString("string1")
+            .endRecord();
+
+    GenericRecord inputRecord = new GenericData.Record(inputRecordSchema);
+    inputRecord.put("integer1", 10);
+    inputRecord.put("string1", "hello");
+
+    Schema outputRecordSchema = AvroUtils.decorateRecordSchema(inputRecordSchema, Collections.singletonList(new Schema.Field("newField", SchemaBuilder.builder().intType(), "test field", null)));
+    Map<String, Object> newFields = new HashMap<>();
+    newFields.put("newField", 5);
+
+    GenericRecord outputRecord = AvroUtils.decorateRecord(inputRecord, newFields, outputRecordSchema);
+    Assert.assertEquals(outputRecord.get("newField"), 5);
+    Assert.assertEquals(outputRecord.get("integer1"), 10);
+    Assert.assertEquals(outputRecord.get("string1"), "hello");
+
+  }
+
+
+  @Test
+  public void testDecorateRecordWithNestedField() throws IOException {
+    Schema inputRecordSchema = SchemaBuilder.record("test").fields()
+            .name("integer1")
+            .prop("innerProp", "innerVal")
+            .type().intBuilder().endInt().noDefault()
+            .requiredString("string1")
+            .endRecord();
+
+    GenericRecord inputRecord = new GenericData.Record(inputRecordSchema);
+    inputRecord.put("integer1", 10);
+    inputRecord.put("string1", "hello");
+
+    Schema nestedFieldSchema = SchemaBuilder.builder().record("metadata")
+            .fields()
+            .requiredString("source")
+            .requiredLong("timestamp")
+            .endRecord();
+
+    Schema.Field nestedField = new Schema.Field("metadata", nestedFieldSchema, "I am a nested field", null);
+
+    Schema outputRecordSchema = AvroUtils.decorateRecordSchema(inputRecordSchema, Collections.singletonList(nestedField));
+    Map<String, Object> newFields = new HashMap<>();
+
+    GenericData.Record metadataRecord = new GenericData.Record(nestedFieldSchema);
+    metadataRecord.put("source", "oracle");
+    metadataRecord.put("timestamp", 1234L);
+
+    newFields.put("metadata", metadataRecord);
+
+    GenericRecord outputRecord = AvroUtils.decorateRecord(inputRecord, newFields, outputRecordSchema);
+    Assert.assertEquals(outputRecord.get("integer1"), 10);
+    Assert.assertEquals(outputRecord.get("string1"), "hello");
+    Assert.assertEquals(outputRecord.get("metadata"), metadataRecord);
+
+
+    // Test that serializing and deserializing this record works.
+    GenericDatumWriter writer = new GenericDatumWriter(outputRecordSchema);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(1000);
+    Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null);
+    writer.write(outputRecord, binaryEncoder);
+    binaryEncoder.flush();
+    baos.close();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
+    GenericDatumReader reader = new GenericDatumReader(outputRecordSchema);
+    GenericRecord deserialized = (GenericRecord) reader.read(null, binaryDecoder);
+    Assert.assertEquals(deserialized.get("integer1"), 10);
+    Assert.assertEquals(deserialized.get("string1").toString(), "hello"); //extra toString: avro returns Utf8
+    Assert.assertEquals(deserialized.get("metadata"), metadataRecord);
+  }
+
+
 }