You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/08 12:03:11 UTC
[pulsar] branch master updated: [pulsar-clients] Support nested
struct for GenericRecord (#4177)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 78502a3 [pulsar-clients] Support nested struct for GenericRecord (#4177)
78502a3 is described below
commit 78502a3cfae0d789cea667c4829830487517b7ea
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed May 8 20:03:04 2019 +0800
[pulsar-clients] Support nested struct for GenericRecord (#4177)
### Motivation
Currently, GenericRecordBuilder only supports primitive types, e.g. int, long, string. But it doesn’t support struct type.
### Modifications
Support nested struct for GenericRecordBuilder, for example AVRO
### Verifying this change
Unit Test Pass
---
.../client/api/schema/RecordSchemaBuilder.java | 9 +
.../client/impl/schema/FieldSchemaBuilderImpl.java | 17 ++
.../impl/schema/RecordSchemaBuilderImpl.java | 8 +
.../impl/schema/generic/AvroRecordBuilderImpl.java | 29 +-
.../client/impl/schema/SchemaBuilderTest.java | 292 ++++++++++++++++++++-
5 files changed, 346 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
index a3e34d7..7526ecf 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/RecordSchemaBuilder.java
@@ -44,6 +44,15 @@ public interface RecordSchemaBuilder {
FieldSchemaBuilder field(String fieldName);
/**
+ * Add a field with the given name and genericSchema to the record.
+ *
+ * @param fieldName name of the field
+ * @param genericSchema schema of the field
+ * @return field schema builder to build the field.
+ */
+ FieldSchemaBuilder field(String fieldName, GenericSchema genericSchema);
+
+ /**
* Add doc to the record schema.
*
* @param doc documentation
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
index ab6f134..88283bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FieldSchemaBuilderImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import java.util.HashMap;
@@ -28,6 +29,8 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaBuilder;
import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
/**
@@ -44,8 +47,15 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
private String doc;
private String[] aliases;
+ private GenericSchema genericSchema;
+
FieldSchemaBuilderImpl(String fieldName) {
+ this(fieldName, null);
+ }
+
+ FieldSchemaBuilderImpl(String fieldName, GenericSchema genericSchema) {
this.fieldName = fieldName;
+ this.genericSchema = genericSchema;
}
@Override
@@ -132,6 +142,13 @@ class FieldSchemaBuilderImpl implements FieldSchemaBuilder<FieldSchemaBuilderImp
case TIMESTAMP:
baseSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
break;
+ case AVRO:
+ checkArgument(genericSchema.getSchemaInfo().getType() == SchemaType.AVRO,
+ "The field is expected to be using AVRO schema but "
+ + genericSchema.getSchemaInfo().getType() + " schema is found");
+ GenericAvroSchema genericAvroSchema = (GenericAvroSchema) genericSchema;
+ baseSchema = genericAvroSchema.getAvroSchema();
+ break;
default:
throw new RuntimeException("Schema `" + type + "` is not supported to be used as a field for now");
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
index 9abede8..ee9f0cb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.client.api.schema.FieldSchemaBuilder;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -61,6 +62,13 @@ public class RecordSchemaBuilderImpl implements RecordSchemaBuilder {
}
@Override
+ public FieldSchemaBuilder field(String fieldName, GenericSchema genericSchema) {
+ FieldSchemaBuilderImpl field = new FieldSchemaBuilderImpl(fieldName, genericSchema);
+ fields.add(field);
+ return field;
+ }
+
+ @Override
public RecordSchemaBuilder doc(String doc) {
this.doc = doc;
return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java
index a8b2dd7..7278ab5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AvroRecordBuilderImpl.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import java.util.List;
+
/**
* Builder to build {@link org.apache.pulsar.client.api.schema.GenericRecord}.
*/
@@ -45,7 +47,15 @@ class AvroRecordBuilderImpl implements GenericRecordBuilder {
*/
@Override
public GenericRecordBuilder set(String fieldName, Object value) {
- avroRecordBuilder.set(fieldName, value);
+ if (value instanceof GenericRecord) {
+ if (value instanceof GenericAvroRecord) {
+ avroRecordBuilder.set(fieldName, ((GenericAvroRecord)value).getAvroRecord());
+ } else {
+ throw new IllegalArgumentException("Avro Record Builder doesn't support non-avro record as a field");
+ }
+ } else {
+ avroRecordBuilder.set(fieldName, value);
+ }
return this;
}
@@ -70,10 +80,19 @@ class AvroRecordBuilderImpl implements GenericRecordBuilder {
* @return a reference to the RecordBuilder.
*/
protected GenericRecordBuilder set(int index, Object value) {
- avroRecordBuilder.set(
- genericSchema.getAvroSchema().getFields().get(index),
- value
- );
+ if (value instanceof GenericRecord) {
+ if (value instanceof GenericAvroRecord) {
+ avroRecordBuilder.set(genericSchema.getAvroSchema().getFields().get(index),
+ ((GenericAvroRecord) value).getAvroRecord());
+ } else {
+ throw new IllegalArgumentException("Avro Record Builder doesn't support non-avro record as a field");
+ }
+ } else {
+ avroRecordBuilder.set(
+ genericSchema.getAvroSchema().getFields().get(index),
+ value
+ );
+ }
return this;
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index 207fd98..9ce3d5c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -21,12 +21,12 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
-import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.*;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.Test;
@@ -59,6 +59,29 @@ public class SchemaBuilderTest {
private double doubleField;
}
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ private static class People {
+ private People1 people1;
+ private People2 people2;
+ private String name;
+ }
+
+ @Data
+ private static class People1 {
+ private int age;
+ private int height;
+ private String name;
+ }
+
+ @Data
+ private static class People2 {
+ private int age;
+ private int height;
+ private String name;
+ }
+
@Test
public void testAllOptionalFieldsSchema() {
RecordSchemaBuilder recordSchemaBuilder =
@@ -200,4 +223,265 @@ public class SchemaBuilderTest {
assertEquals(0.7f, fields.floatField);
assertEquals(1.34d, fields.doubleField);
}
+
+ @Test
+ public void testGenericRecordBuilderAvroByFieldname() {
+ RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+ people1SchemaBuilder.field("age").type(SchemaType.INT32);
+ people1SchemaBuilder.field("height").type(SchemaType.INT32);
+ people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema people1Schema = Schema.generic(people1SchemaInfo);
+
+
+ GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+ people1RecordBuilder.set("age", 20);
+ people1RecordBuilder.set("height", 180);
+ people1RecordBuilder.set("name", "people1");
+ GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+ RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+ people2SchemaBuilder.field("age").type(SchemaType.INT32);
+ people2SchemaBuilder.field("height").type(SchemaType.INT32);
+ people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+ SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema people2Schema = Schema.generic(people2SchemaInfo);
+
+ GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+ people2RecordBuilder.set("age", 20);
+ people2RecordBuilder.set("height", 180);
+ people2RecordBuilder.set("name", "people2");
+ GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+ RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+ peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+ GenericSchema peopleSchema = Schema.generic(schemaInfo);
+ GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+ peopleRecordBuilder.set("people1", people1GenericRecord);
+ peopleRecordBuilder.set("people2", people2GenericRecord);
+ peopleRecordBuilder.set("name", "people");
+ GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+ byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+ GenericRecord people = (GenericRecord) peopleSchema.decode(peopleEncode);
+
+ assertEquals(people.getFields(), peopleRecord.getFields());
+ assertEquals((people.getField("name")), peopleRecord.getField("name"));
+ assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
+ people1GenericRecord.getField("age"));
+ assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
+ people1GenericRecord.getField("heigth"));
+ assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
+ people1GenericRecord.getField("name"));
+ assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
+ people2GenericRecord.getField("age"));
+ assertEquals(((GenericRecord)people.getField("people2")).getField("height"),
+ people2GenericRecord.getField("height"));
+ assertEquals(((GenericRecord)people.getField("people2")).getField("name"),
+ people2GenericRecord.getField("name"));
+
+ }
+
+ @Test
+ public void testGenericRecordBuilderAvroByFieldnamePojo() {
+ RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+ people1SchemaBuilder.field("age").type(SchemaType.INT32);
+ people1SchemaBuilder.field("height").type(SchemaType.INT32);
+ people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema people1Schema = Schema.generic(people1SchemaInfo);
+
+
+ GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+ people1RecordBuilder.set("age", 20);
+ people1RecordBuilder.set("height", 180);
+ people1RecordBuilder.set("name", "people1");
+ GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+ RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+ people2SchemaBuilder.field("age").type(SchemaType.INT32);
+ people2SchemaBuilder.field("height").type(SchemaType.INT32);
+ people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+ SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema people2Schema = Schema.generic(people2SchemaInfo);
+
+ GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+ people2RecordBuilder.set("age", 20);
+ people2RecordBuilder.set("height", 180);
+ people2RecordBuilder.set("name", "people2");
+ GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+ RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+ peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+ GenericSchema peopleSchema = Schema.generic(schemaInfo);
+ GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+ peopleRecordBuilder.set("people1", people1GenericRecord);
+ peopleRecordBuilder.set("people2", people2GenericRecord);
+ peopleRecordBuilder.set("name", "people");
+ GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+ byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+ Schema<People> peopleDecodeSchema = Schema.AVRO(
+ SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+ People people = peopleDecodeSchema.decode(peopleEncode);
+
+ assertEquals(people.name, peopleRecord.getField("name"));
+ assertEquals(people.getPeople1().age, people1GenericRecord.getField("age"));
+ assertEquals(people.getPeople1().height, people1GenericRecord.getField("height"));
+ assertEquals(people.getPeople1().name, people1GenericRecord.getField("name"));
+ assertEquals(people.getPeople2().age, people2GenericRecord.getField("age"));
+ assertEquals(people.getPeople2().height, people2GenericRecord.getField("height"));
+ assertEquals(people.getPeople2().name, people2GenericRecord.getField("name"));
+
+ }
+
+ @Test
+ public void testGenericRecordBuilderAvroByFieldIndex() {
+ RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+ people1SchemaBuilder.field("age").type(SchemaType.INT32);
+ people1SchemaBuilder.field("height").type(SchemaType.INT32);
+ people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema<GenericRecord> people1Schema = Schema.generic(people1SchemaInfo);
+
+
+ GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+ people1RecordBuilder.set(people1Schema.getFields().get(0), 20);
+ people1RecordBuilder.set(people1Schema.getFields().get(1), 180);
+ people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
+ GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+ RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+ people2SchemaBuilder.field("age").type(SchemaType.INT32);
+ people2SchemaBuilder.field("height").type(SchemaType.INT32);
+ people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+ SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema<GenericRecord> people2Schema = Schema.generic(people2SchemaInfo);
+
+ GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+ people2RecordBuilder.set(people2Schema.getFields().get(0), 20);
+ people2RecordBuilder.set(people2Schema.getFields().get(1), 180);
+ people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
+ GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+ RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+ peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+ GenericSchema<GenericRecord> peopleSchema = Schema.generic(schemaInfo);
+ GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+ peopleRecordBuilder.set(peopleSchema.getFields().get(0), people1GenericRecord);
+ peopleRecordBuilder.set(peopleSchema.getFields().get(1), people2GenericRecord);
+ peopleRecordBuilder.set(peopleSchema.getFields().get(2), "people");
+ GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+ byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+ GenericRecord people = (GenericRecord) peopleSchema.decode(peopleEncode);
+
+ assertEquals(people.getFields(), peopleRecord.getFields());
+ assertEquals((people.getField("name")), peopleRecord.getField("name"));
+ assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
+ people1GenericRecord.getField("age"));
+ assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
+ people1GenericRecord.getField("heigth"));
+ assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
+ people1GenericRecord.getField("name"));
+ assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
+ people2GenericRecord.getField("age"));
+ assertEquals(((GenericRecord)people.getField("people2")).getField("height"),
+ people2GenericRecord.getField("height"));
+ assertEquals(((GenericRecord)people.getField("people2")).getField("name"),
+ people2GenericRecord.getField("name"));
+
+ }
+
+ @Test
+ public void testGenericRecordBuilderAvroByFieldIndexPojo() {
+ RecordSchemaBuilder people1SchemaBuilder = SchemaBuilder.record("People1");
+ people1SchemaBuilder.field("age").type(SchemaType.INT32);
+ people1SchemaBuilder.field("height").type(SchemaType.INT32);
+ people1SchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo people1SchemaInfo = people1SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema<GenericRecord> people1Schema = Schema.generic(people1SchemaInfo);
+
+
+ GenericRecordBuilder people1RecordBuilder = people1Schema.newRecordBuilder();
+ people1RecordBuilder.set(people1Schema.getFields().get(0), 20);
+ people1RecordBuilder.set(people1Schema.getFields().get(1), 180);
+ people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
+ GenericRecord people1GenericRecord = people1RecordBuilder.build();
+
+ RecordSchemaBuilder people2SchemaBuilder = SchemaBuilder.record("People2");
+ people2SchemaBuilder.field("age").type(SchemaType.INT32);
+ people2SchemaBuilder.field("height").type(SchemaType.INT32);
+ people2SchemaBuilder.field("name").type(SchemaType.STRING);
+
+ SchemaInfo people2SchemaInfo = people2SchemaBuilder.build(SchemaType.AVRO);
+ GenericSchema<GenericRecord> people2Schema = Schema.generic(people2SchemaInfo);
+
+ GenericRecordBuilder people2RecordBuilder = people2Schema.newRecordBuilder();
+ people2RecordBuilder.set(people2Schema.getFields().get(0), 20);
+ people2RecordBuilder.set(people2Schema.getFields().get(1), 180);
+ people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
+ GenericRecord people2GenericRecord = people2RecordBuilder.build();
+
+ RecordSchemaBuilder peopleSchemaBuilder = SchemaBuilder.record("People");
+ peopleSchemaBuilder.field("people1", people1Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("people2", people2Schema).type(SchemaType.AVRO);
+ peopleSchemaBuilder.field("name").type(SchemaType.STRING);
+
+
+ SchemaInfo schemaInfo = peopleSchemaBuilder.build(SchemaType.AVRO);
+
+ GenericSchema<GenericRecord> peopleSchema = Schema.generic(schemaInfo);
+ GenericRecordBuilder peopleRecordBuilder = peopleSchema.newRecordBuilder();
+ peopleRecordBuilder.set(peopleSchema.getFields().get(0), people1GenericRecord);
+ peopleRecordBuilder.set(peopleSchema.getFields().get(1), people2GenericRecord);
+ peopleRecordBuilder.set(peopleSchema.getFields().get(2), "people");
+ GenericRecord peopleRecord = peopleRecordBuilder.build();
+
+ byte[] peopleEncode = peopleSchema.encode(peopleRecord);
+
+ Schema<People> peopleDecodeSchema = Schema.AVRO(
+ SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+ People people = peopleDecodeSchema.decode(peopleEncode);
+
+ assertEquals(people.name, peopleRecord.getField("name"));
+ assertEquals(people.getPeople1().age, people1GenericRecord.getField("age"));
+ assertEquals(people.getPeople1().height, people1GenericRecord.getField("height"));
+ assertEquals(people.getPeople1().name, people1GenericRecord.getField("name"));
+ assertEquals(people.getPeople2().age, people2GenericRecord.getField("age"));
+ assertEquals(people.getPeople2().height, people2GenericRecord.getField("height"));
+ assertEquals(people.getPeople2().name, people2GenericRecord.getField("name"));
+ }
}