You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/25 17:08:24 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-757] Adding
utility functions to support decoration of Avro Generic Records[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3ec902e [GOBBLIN-757] Adding utility functions to support decoration of Avro Generic Records[]
3ec902e is described below
commit 3ec902e93ebe60dfce8a3b02dc32482f241c84dd
Author: Shirshanka Das <sd...@linkedin.com>
AuthorDate: Thu Apr 25 10:08:17 2019 -0700
[GOBBLIN-757] Adding utility functions to support decoration of Avro Generic Records[]
…eneric Records
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [X] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-757
### Description
- [X] Here are some details about my PR, including
screenshots (if applicable):
Adds two methods to AvroUtils that work on schemas
and records.
### Tests
- [X] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
additional tests in AvroUtilsTest
### Commits
- [X] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2621 from shirshanka/avro-decorate
---
.../java/org/apache/gobblin/util/AvroUtils.java | 50 +++++-
.../org/apache/gobblin/util/AvroUtilsTest.java | 175 ++++++++++++++++++++-
2 files changed, 223 insertions(+), 2 deletions(-)
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 242dcd8..8f2c653 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
@@ -37,6 +38,7 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
@@ -75,6 +77,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
+import javax.annotation.Nonnull;
/**
* A Utils class for dealing with Avro objects
@@ -108,10 +111,15 @@ public class AvroUtils {
public static List<Field> deepCopySchemaFields(Schema readerSchema) {
return readerSchema.getFields().stream()
- .map(field -> new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()))
+ .map(field -> {
+ Field f = new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order());
+ field.getProps().forEach((key, value) -> f.addProp(key, value));
+ return f;
+ })
.collect(Collectors.toList());
}
+
public static class AvroPathFilter implements PathFilter {
@Override
public boolean accept(Path path) {
@@ -834,4 +842,44 @@ public class AvroUtils {
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, decoder);
}
+
+
+ /**
+ * Decorate the {@link Schema} for a record with additional {@link Field}s.
+ * @param inputSchema: must be a {@link Record} schema.
+ * @return the decorated Schema. Fields are appended to the inputSchema.
+ */
+ public static Schema decorateRecordSchema(Schema inputSchema, @Nonnull List<Field> fieldList) {
+ Preconditions.checkState(inputSchema.getType().equals(Type.RECORD));
+ List<Field> outputFields = deepCopySchemaFields(inputSchema);
+ List<Field> newOutputFields = Stream.concat(outputFields.stream(), fieldList.stream()).collect(Collectors.toList());
+
+ Schema outputSchema = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(),
+ inputSchema.getNamespace(), inputSchema.isError());
+ outputSchema.setFields(newOutputFields);
+ copyProperties(inputSchema, outputSchema);
+ return outputSchema;
+ }
+
+ /**
+ * Decorate a {@link GenericRecord} with additional fields and make it conform to an extended Schema
+ * It is the caller's responsibility to ensure that the outputSchema is the merge of the inputRecord's schema
+ * and the additional fields. The method does not check this for performance reasons, because it is expected to be called in the
+ * critical path of processing a record.
+ * Use {@link AvroUtils#decorateRecordSchema(Schema, List)} to generate such a Schema before calling this method.
+ * @param inputRecord: record with data to be copied into the output record
+ * @param fieldMap: values can be primitive types or GenericRecords if nested
+ * @param outputSchema: the schema that the decoratedRecord will conform to
+ * @return an outputRecord that contains a union of the fields in the inputRecord and the field-values in the fieldMap
+ */
+ public static GenericRecord decorateRecord(GenericRecord inputRecord, @Nonnull Map<String, Object> fieldMap,
+ Schema outputSchema) {
+ GenericRecord outputRecord = new GenericData.Record(outputSchema);
+ inputRecord.getSchema().getFields().forEach(
+ f -> outputRecord.put(f.name(), inputRecord.get(f.name()))
+ );
+ fieldMap.forEach((key, value) -> outputRecord.put(key, value));
+ return outputRecord;
+ }
+
}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 627fc60..94ce650 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -17,9 +17,12 @@
package org.apache.gobblin.util;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
-
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,11 +34,18 @@ import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -310,4 +320,167 @@ public class AvroUtilsTest {
String stringValue = AvroUtils.getFieldValue(record, "union.string").get().toString();
Assert.assertEquals(stringValue, "testString");
}
+
+
+ @Test
+ public void testDecorateSchemaWithSingleField() {
+
+ Schema inputRecord = SchemaBuilder.record("test").fields().requiredInt("numeric1")
+ .requiredString("string1").endRecord();
+ Schema fieldSchema = SchemaBuilder.builder().intType();
+ Schema.Field field = new Schema.Field("newField", fieldSchema, "",null);
+ Schema outputRecord = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(field));
+ checkFieldsMatch(inputRecord, outputRecord);
+ Assert.assertNotNull(outputRecord.getField("newField"));
+ Assert.assertEquals(outputRecord.getField("newField").schema(), fieldSchema);
+ }
+
+ private void checkFieldsMatch(Schema inputRecord, Schema outputRecord) {
+ inputRecord.getFields().forEach(f -> {
+ Schema.Field outField = outputRecord.getField(f.name());
+ Assert.assertEquals(f, outField);
+ });
+ }
+
+ @Test
+ public void testDecorateSchemaWithStringProperties() {
+
+ Schema inputRecord = SchemaBuilder.record("test").fields()
+ .name("integer1")
+ .prop("innerProp", "innerVal")
+ .type().intBuilder().endInt().noDefault()
+ .requiredString("string1")
+ .endRecord();
+ inputRecord.addProp("topLevelProp", "topLevelVal");
+ Schema.Field additionalField = getTestInnerRecordField();
+ Schema outputSchema = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(additionalField));
+ checkFieldsMatch(inputRecord, outputSchema);
+ Assert.assertEquals(outputSchema.getProp("topLevelProp"), "topLevelVal");
+ Assert.assertEquals(outputSchema.getField("integer1").getProp("innerProp"), "innerVal");
+ }
+
+ @Test
+ public void testDecorateSchemaWithObjectProperties() throws IOException {
+
+ String customPropertyString = "{\"custom\": {\"prop1\": \"val1\"}}";
+ JsonNode customPropertyValue = new ObjectMapper().readTree(customPropertyString);
+
+ Schema inputRecord = SchemaBuilder.record("test").fields()
+ .name("integer1")
+ .prop("innerProp", "innerVal")
+ .type().intBuilder().endInt().noDefault()
+ .requiredString("string1")
+ .endRecord();
+ inputRecord.addProp("topLevelProp", customPropertyValue);
+ Schema.Field additionalField = getTestInnerRecordField();
+ Schema outputSchema = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(additionalField));
+ checkFieldsMatch(inputRecord, outputSchema);
+ Assert.assertEquals(outputSchema.getProp("topLevelProp"), inputRecord.getProp("topLevelProp"));
+ Assert.assertEquals(outputSchema.getField("integer1").getProp("innerProp"), "innerVal");
+ }
+
+
+ private Schema.Field getTestInnerRecordField() {
+ Schema fieldSchema = SchemaBuilder.record("innerRecord")
+ .fields().requiredInt("innerInt").requiredString("innerString")
+ .endRecord();
+ Schema.Field field = new Schema.Field("innerRecord", fieldSchema, "",null);
+ return field;
+ }
+
+
+ @Test
+ public void testDecorateSchemaWithSingleRecord() {
+ Schema inputRecord = SchemaBuilder.record("test").fields().requiredInt("numeric1")
+ .requiredString("string1").endRecord();
+ Schema fieldSchema = SchemaBuilder.record("innerRecord")
+ .fields().requiredInt("innerInt").requiredString("innerString")
+ .endRecord();
+ Schema.Field field = new Schema.Field("innerRecord", fieldSchema, "",null);
+ Schema outputRecord = AvroUtils.decorateRecordSchema(inputRecord, Collections.singletonList(field));
+ checkFieldsMatch(inputRecord, outputRecord);
+ Assert.assertNotNull(outputRecord.getField("innerRecord"));
+ Assert.assertEquals(outputRecord.getField("innerRecord").schema(), fieldSchema);
+ }
+
+
+ @Test
+ public void testDecorateRecordWithPrimitiveField() {
+ Schema inputRecordSchema = SchemaBuilder.record("test").fields()
+ .name("integer1")
+ .prop("innerProp", "innerVal")
+ .type().intBuilder().endInt().noDefault()
+ .requiredString("string1")
+ .endRecord();
+
+ GenericRecord inputRecord = new GenericData.Record(inputRecordSchema);
+ inputRecord.put("integer1", 10);
+ inputRecord.put("string1", "hello");
+
+ Schema outputRecordSchema = AvroUtils.decorateRecordSchema(inputRecordSchema, Collections.singletonList(new Schema.Field("newField", SchemaBuilder.builder().intType(), "test field", null)));
+ Map<String, Object> newFields = new HashMap<>();
+ newFields.put("newField", 5);
+
+ GenericRecord outputRecord = AvroUtils.decorateRecord(inputRecord, newFields, outputRecordSchema);
+ Assert.assertEquals(outputRecord.get("newField"), 5);
+ Assert.assertEquals(outputRecord.get("integer1"), 10);
+ Assert.assertEquals(outputRecord.get("string1"), "hello");
+
+ }
+
+
+ @Test
+ public void testDecorateRecordWithNestedField() throws IOException {
+ Schema inputRecordSchema = SchemaBuilder.record("test").fields()
+ .name("integer1")
+ .prop("innerProp", "innerVal")
+ .type().intBuilder().endInt().noDefault()
+ .requiredString("string1")
+ .endRecord();
+
+ GenericRecord inputRecord = new GenericData.Record(inputRecordSchema);
+ inputRecord.put("integer1", 10);
+ inputRecord.put("string1", "hello");
+
+ Schema nestedFieldSchema = SchemaBuilder.builder().record("metadata")
+ .fields()
+ .requiredString("source")
+ .requiredLong("timestamp")
+ .endRecord();
+
+ Schema.Field nestedField = new Schema.Field("metadata", nestedFieldSchema, "I am a nested field", null);
+
+ Schema outputRecordSchema = AvroUtils.decorateRecordSchema(inputRecordSchema, Collections.singletonList(nestedField));
+ Map<String, Object> newFields = new HashMap<>();
+
+ GenericData.Record metadataRecord = new GenericData.Record(nestedFieldSchema);
+ metadataRecord.put("source", "oracle");
+ metadataRecord.put("timestamp", 1234L);
+
+ newFields.put("metadata", metadataRecord);
+
+ GenericRecord outputRecord = AvroUtils.decorateRecord(inputRecord, newFields, outputRecordSchema);
+ Assert.assertEquals(outputRecord.get("integer1"), 10);
+ Assert.assertEquals(outputRecord.get("string1"), "hello");
+ Assert.assertEquals(outputRecord.get("metadata"), metadataRecord);
+
+
+ // Test that serializing and deserializing this record works.
+ GenericDatumWriter writer = new GenericDatumWriter(outputRecordSchema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1000);
+ Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null);
+ writer.write(outputRecord, binaryEncoder);
+ binaryEncoder.flush();
+ baos.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(bais, null);
+ GenericDatumReader reader = new GenericDatumReader(outputRecordSchema);
+ GenericRecord deserialized = (GenericRecord) reader.read(null, binaryDecoder);
+ Assert.assertEquals(deserialized.get("integer1"), 10);
+ Assert.assertEquals(deserialized.get("string1").toString(), "hello"); //extra toString: avro returns Utf8
+ Assert.assertEquals(deserialized.get("metadata"), metadataRecord);
+ }
+
+
}